Commit 46864db5 authored by Jeremy Mikola's avatar Jeremy Mikola

Refactor GridFS internals

The Bucket's public API remains unchanged. More logic has been moved into the CollectionWrapper and the stream classes have also been renamed.

The StreamWrapper's registration has been changed to allow a configurable protocol (Bucket defaults to "gridfs"). While file paths are generated for readable and writable streams (now in a more robust manner), necessary arguments are passed through context options. We may decide to simplify the path for readable streams down the line, unless the current path is beneficial for debugging.

Closes #167
parent 2b93dab1
......@@ -10,6 +10,7 @@ use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\GridFS\Exception\FileNotFoundException;
use MongoDB\Operation\Find;
use stdClass;
/**
* Bucket provides a public API for interacting with the GridFS files and chunks
......@@ -19,8 +20,8 @@ use MongoDB\Operation\Find;
*/
class Bucket
{
private static $streamWrapper;
private static $defaultChunkSizeBytes = 261120;
private static $streamWrapperProtocol = 'gridfs';
private $collectionWrapper;
private $databaseName;
......@@ -75,7 +76,7 @@ class Bucket
$collectionOptions = array_intersect_key($options, ['readPreference' => 1, 'writeConcern' => 1]);
$this->collectionWrapper = new CollectionWrapper($manager, $databaseName, $options['bucketName'], $collectionOptions);
$this->registerStreamWrapper($manager);
$this->registerStreamWrapper();
}
/**
......@@ -89,14 +90,12 @@ class Bucket
*/
public function delete(ObjectId $id)
{
$file = $this->collectionWrapper->getFilesCollection()->findOne(['_id' => $id]);
$this->collectionWrapper->getFilesCollection()->deleteOne(['_id' => $id]);
$this->collectionWrapper->getChunksCollection()->deleteMany(['files_id' => $id]);
$file = $this->collectionWrapper->findFileById($id);
$this->collectionWrapper->deleteFileAndChunksById($id);
if ($file === null) {
throw FileNotFoundException::byId($id, $this->collectionWrapper->getFilesCollection()->getNameSpace());
throw FileNotFoundException::byId($id, $this->getFilesNamespace());
}
}
/**
......@@ -108,17 +107,14 @@ class Bucket
*/
public function downloadToStream(ObjectId $id, $destination)
{
$file = $this->collectionWrapper->getFilesCollection()->findOne(
['_id' => $id],
['typeMap' => ['root' => 'stdClass']]
);
$file = $this->collectionWrapper->findFileById($id);
if ($file === null) {
throw FileNotFoundException::byId($id, $this->collectionWrapper->getFilesCollection()->getNameSpace());
throw FileNotFoundException::byId($id, $this->getFilesNamespace());
}
$gridFsStream = new GridFSDownload($this->collectionWrapper, $file);
$gridFsStream->downloadToStream($destination);
$stream = new ReadableStream($this->collectionWrapper, $file);
$stream ->downloadToStream($destination);
}
/**
......@@ -148,23 +144,29 @@ class Bucket
public function downloadToStreamByName($filename, $destination, array $options = [])
{
$options += ['revision' => -1];
$file = $this->findFileRevision($filename, $options['revision']);
$gridFsStream = new GridFSDownload($this->collectionWrapper, $file);
$gridFsStream->downloadToStream($destination);
$file = $this->collectionWrapper->findFileByFilenameAndRevision($filename, $options['revision']);
if ($file === null) {
throw FileNotFoundException::byFilenameAndRevision($filename, $options['revision'], $this->getFilesNamespace());
}
$stream = new ReadableStream($this->collectionWrapper, $file);
$stream->downloadToStream($destination);
}
/**
* Drops the files and chunks collection associated with GridFS this bucket
*
*/
* Drops the files and chunks collections associated with this GridFS
* bucket.
*/
public function drop()
{
$this->collectionWrapper->dropCollections();
}
/**
* Find files from the GridFS bucket's files collection.
* Finds documents from the GridFS bucket's files collection matching the
* query.
*
* @see Find::__construct() for supported options
* @param array|object $filter Query by which to filter documents
......@@ -173,10 +175,10 @@ class Bucket
*/
public function find($filter, array $options = [])
{
return $this->collectionWrapper->getFilesCollection()->find($filter, $options);
return $this->collectionWrapper->findFiles($filter, $options);
}
public function getCollectionsWrapper()
public function getCollectionWrapper()
{
return $this->collectionWrapper;
}
......@@ -200,7 +202,7 @@ class Bucket
return $metadata['wrapper_data']->getId();
}
return;
// TODO: Throw if we cannot access the ID
}
/**
......@@ -212,13 +214,10 @@ class Bucket
*/
public function openDownloadStream(ObjectId $id)
{
$file = $this->collectionWrapper->getFilesCollection()->findOne(
['_id' => $id],
['typeMap' => ['root' => 'stdClass']]
);
$file = $this->collectionWrapper->findFileById($id);
if ($file === null) {
throw FileNotFoundException::byId($id, $this->collectionWrapper->getFilesCollection()->getNameSpace());
throw FileNotFoundException::byId($id, $this->getFilesNamespace());
}
return $this->openDownloadStreamByFile($file);
......@@ -251,7 +250,12 @@ class Bucket
public function openDownloadStreamByName($filename, array $options = [])
{
$options += ['revision' => -1];
$file = $this->findFileRevision($filename, $options['revision']);
$file = $this->collectionWrapper->findFileByFilenameAndRevision($filename, $options['revision']);
if ($file === null) {
throw FileNotFoundException::byFilenameAndRevision($filename, $options['revision'], $this->getFilesNamespace());
}
return $this->openDownloadStreamByFile($file);
}
......@@ -265,21 +269,23 @@ class Bucket
* bucket's chunk size.
*
* @param string $filename File name
* @param array $options Stream options
* @param array $options Upload options
* @return resource
*/
public function openUploadStream($filename, array $options = [])
{
$options += ['chunkSizeBytes' => $this->options['chunkSizeBytes']];
$streamOptions = [
'collectionWrapper' => $this->collectionWrapper,
'uploadOptions' => $options,
];
$context = stream_context_create(['gridfs' => $streamOptions]);
$path = $this->createPathForUpload();
$context = stream_context_create([
self::$streamWrapperProtocol => [
'collectionWrapper' => $this->collectionWrapper,
'filename' => $filename,
'options' => $options,
],
]);
return fopen(sprintf('gridfs://%s/%s', $this->databaseName, $filename), 'w', false, $context);
return fopen($path, 'w', false, $context);
}
/**
......@@ -287,14 +293,27 @@ class Bucket
*
* @param ObjectId $id ID of the file to rename
* @param string $newFilename New file name
* @throws GridFSFileNotFoundException
* @throws FileNotFoundException
*/
public function rename(ObjectId $id, $newFilename)
{
$filesCollection = $this->collectionWrapper->getFilesCollection();
$result = $filesCollection->updateOne(['_id' => $id], ['$set' => ['filename' => $newFilename]]);
if($result->getModifiedCount() == 0) {
throw FileNotFoundException::byId($id, $this->collectionWrapper->getFilesCollection()->getNameSpace());
$updateResult = $this->collectionWrapper->updateFilenameForId($id, $newFilename);
if ($updateResult->getModifiedCount() === 1) {
return;
}
/* If the update resulted in no modification, it's possible that the
* file did not exist, in which case we must raise an error. Checking
* the write result's matched count will be most efficient, but fall
* back to a findOne operation if necessary (i.e. legacy writes).
*/
$found = $updateResult->getMatchedCount() !== null
? $updateResult->getMatchedCount() === 1
: $this->collectionWrapper->findFileById($id) !== null;
if ( ! $found) {
throw FileNotFoundException::byId($id, $this->getFilesNamespace());
}
}
......@@ -310,61 +329,93 @@ class Bucket
* @param resource $source Readable stream
* @param array $options Stream options
* @return ObjectId
* @throws InvalidArgumentException
*/
public function uploadFromStream($filename, $source, array $options = [])
{
$options += ['chunkSizeBytes' => $this->options['chunkSizeBytes']];
$gridFsStream = new GridFSUpload($this->collectionWrapper, $filename, $options);
return $gridFsStream->uploadFromStream($source);
$stream = new WritableStream($this->collectionWrapper, $filename, $options);
return $stream->uploadFromStream($source);
}
private function findFileRevision($filename, $revision)
/**
* Creates a path for an existing GridFS file.
*
* @param stdClass $file GridFS file document
* @return string
*/
private function createPathForFile(stdClass $file)
{
if ($revision < 0) {
$skip = abs($revision) - 1;
$sortOrder = -1;
if ( ! is_object($file->_id) || method_exists($file->_id, '__toString')) {
$id = (string) $file->_id;
} else {
$skip = $revision;
$sortOrder = 1;
$id = \MongoDB\BSON\toJSON(\MongoDB\BSON\fromPHP(['_id' => $file->_id]));
}
$filesCollection = $this->collectionWrapper->getFilesCollection();
$file = $filesCollection->findOne(
['filename' => $filename],
[
'skip' => $skip,
'sort' => ['uploadDate' => $sortOrder],
'typeMap' => ['root' => 'stdClass'],
]
return sprintf(
'%s://%s/%s.files/%s',
self::$streamWrapperProtocol,
urlencode($this->databaseName),
urlencode($this->options['bucketName']),
urlencode($id)
);
if ($file === null) {
throw FileNotFoundException::byFilenameAndRevision($filename, $revision, $filesCollection->getNameSpace());
}
return $file;
}
private function openDownloadStreamByFile($file)
/**
* Creates a path for a new GridFS file, which does not yet have an ID.
*
* @return string
*/
private function createPathForUpload()
{
$options = [
'collectionWrapper' => $this->collectionWrapper,
'file' => $file,
];
return sprintf(
'%s://%s/%s.files',
self::$streamWrapperProtocol,
urlencode($this->databaseName),
urlencode($this->options['bucketName'])
);
}
$context = stream_context_create(['gridfs' => $options]);
/**
* Returns the names of the files collection.
*
* @return string
*/
private function getFilesNamespace()
{
return sprintf('%s.%s.files', $this->databaseName, $this->options['bucketName']);
}
return fopen(sprintf('gridfs://%s/%s', $this->databaseName, $file->filename), 'r', false, $context);
/**
* Opens a readable stream for the GridFS file.
*
* @param stdClass $file GridFS file document
* @return resource
*/
private function openDownloadStreamByFile(stdClass $file)
{
$path = $this->createPathForFile($file);
$context = stream_context_create([
self::$streamWrapperProtocol => [
'collectionWrapper' => $this->collectionWrapper,
'file' => $file,
],
]);
return fopen($path, 'r', false, $context);
}
private function registerStreamWrapper(Manager $manager)
/**
* Registers the GridFS stream wrapper if it is not already registered.
*/
private function registerStreamWrapper()
{
if (isset(self::$streamWrapper)) {
if (in_array(self::$streamWrapperProtocol, stream_get_wrappers())) {
return;
}
self::$streamWrapper = new StreamWrapper();
self::$streamWrapper->register($manager);
StreamWrapper::register(self::$streamWrapperProtocol);
}
}
......@@ -3,8 +3,12 @@
namespace MongoDB\GridFS;
use MongoDB\Collection;
use MongoDB\UpdateResult;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use IteratorIterator;
use stdClass;
/**
* CollectionWrapper abstracts the GridFS files and chunks collections.
......@@ -14,7 +18,7 @@ use MongoDB\Driver\ReadPreference;
class CollectionWrapper
{
private $chunksCollection;
private $ensuredIndexes = false;
private $checkedIndexes = false;
private $filesCollection;
/**
......@@ -33,34 +37,183 @@ class CollectionWrapper
$this->chunksCollection = new Collection($manager, $databaseName, sprintf('%s.chunks', $bucketName), $collectionOptions);
}
/**
* Deletes all GridFS chunks for a given file ID.
*
* @param mixed $id
*/
public function deleteChunksByFilesId($id)
{
$this->chunksCollection->deleteMany(['files_id' => $id]);
}
/**
* Deletes a GridFS file and related chunks by ID.
*
* @param mixed $id
*/
public function deleteFileAndChunksById($id)
{
$this->filesCollection->deleteOne(['_id' => $id]);
$this->chunksCollection->deleteMany(['files_id' => $id]);
}
/**
* Drops the GridFS files and chunks collections.
*/
public function dropCollections()
{
$this->filesCollection->drop();
$this->chunksCollection->drop();
}
/**
* Finds a GridFS file document for a given filename and revision.
*
* Revision numbers are defined as follows:
*
* * 0 = the original stored file
* * 1 = the first revision
* * 2 = the second revision
* * etc…
* * -2 = the second most recent revision
* * -1 = the most recent revision
*
* @see Bucket::downloadToStreamByName()
* @see Bucket::openDownloadStreamByName()
* @param string $filename
* @param integer $revision
* @return stdClass|null
*/
public function findFileByFilenameAndRevision($filename, $revision)
{
$filename = (string) $filename;
$revision = (integer) $revision;
if ($revision < 0) {
$skip = abs($revision) - 1;
$sortOrder = -1;
} else {
$skip = $revision;
$sortOrder = 1;
}
return $this->filesCollection->findOne(
['filename' => $filename],
[
'skip' => $skip,
'sort' => ['uploadDate' => $sortOrder],
'typeMap' => ['root' => 'stdClass'],
]
);
}
/**
* Finds a GridFS file document for a given ID.
*
* @param mixed $id
* @return stdClass|null
*/
public function findFileById($id)
{
return $this->filesCollection->findOne(
['_id' => $id],
['typeMap' => ['root' => 'stdClass']]
);
}
/**
* Finds documents from the GridFS bucket's files collection.
*
* @see Find::__construct() for supported options
* @param array|object $filter Query by which to filter documents
* @param array $options Additional options
* @return Cursor
*/
public function findFiles($filter, array $options = [])
{
return $this->filesCollection->find($filter, $options);
}
// TODO: Remove this
public function getChunksCollection()
{
return $this->chunksCollection;
}
/**
* Returns a chunks iterator for a given file ID.
*
* @param mixed $id
* @return IteratorIterator
*/
public function getChunksIteratorByFilesId($id)
{
$cursor = $this->chunksCollection->find(
['files_id' => $id],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);
return new IteratorIterator($cursor);
}
// TODO: Remove this
public function getFilesCollection()
{
return $this->filesCollection;
}
/**
* Inserts a document into the chunks collection.
*
* @param array|object $chunk Chunk document
*/
public function insertChunk($chunk)
{
$this->ensureIndexes();
if ( ! $this->checkedIndexes) {
$this->ensureIndexes();
}
$this->chunksCollection->insertOne($chunk);
}
/**
* Inserts a document into the files collection.
*
* The file document should be inserted after all chunks have been inserted.
*
* @param array|object $file File document
*/
public function insertFile($file)
{
$this->ensureIndexes();
if ( ! $this->checkedIndexes) {
$this->ensureIndexes();
}
$this->filesCollection->insertOne($file);
}
/**
* Updates the filename field in the file document for a given ID.
*
* @param mixed $id
* @param string $filename
* @return UpdateResult
*/
public function updateFilenameForId($id, $filename)
{
return $this->filesCollection->updateOne(
['_id' => $id],
['$set' => ['filename' => (string) $filename]]
);
}
/**
* Create an index on the chunks collection if it does not already exist.
*/
private function ensureChunksIndex()
{
foreach ($this->chunksCollection->listIndexes() as $index) {
......@@ -72,6 +225,9 @@ class CollectionWrapper
$this->chunksCollection->createIndex(['files_id' => 1, 'n' => 1], ['unique' => true]);
}
/**
* Create an index on the files collection if it does not already exist.
*/
private function ensureFilesIndex()
{
foreach ($this->filesCollection->listIndexes() as $index) {
......@@ -83,21 +239,33 @@ class CollectionWrapper
$this->filesCollection->createIndex(['filename' => 1, 'uploadDate' => 1]);
}
/**
* Ensure indexes on the files and chunks collections exist.
*
* This method is called once before the first write operation on a GridFS
* bucket. Indexes are only be created if the files collection is empty.
*/
private function ensureIndexes()
{
if ($this->ensuredIndexes) {
if ($this->checkedIndexes) {
return;
}
$this->checkedIndexes = true;
if ( ! $this->isFilesCollectionEmpty()) {
return;
}
$this->ensureFilesIndex();
$this->ensureChunksIndex();
$this->ensuredIndexes = true;
}
/**
* Returns whether the files collection is empty.
*
* @return boolean
*/
private function isFilesCollectionEmpty()
{
return null === $this->filesCollection->findOne([], [
......
......@@ -7,49 +7,37 @@ use MongoDB\GridFS\Exception\CorruptFileException;
use stdClass;
/**
* GridFSDownload abstracts the process of reading a GridFS file.
* ReadableStream abstracts the process of reading a GridFS file.
*
* @internal
*/
class GridFSDownload
class ReadableStream
{
private $buffer;
private $bufferEmpty = true;
private $bufferFresh = true;
private $bufferEmpty;
private $bufferFresh;
private $bytesSeen = 0;
private $chunkOffset = 0;
private $chunksIterator;
private $collectionWrapper;
private $file;
private $firstCheck = true;
private $iteratorEmpty = false;
private $numChunks;
/**
* Constructs a GridFS download stream.
* Constructs a readable GridFS stream.
*
* @param CollectionWrapper $collectionWrapper GridFS collection wrapper
* @param stdClass $file GridFS file document
* @param stdClass $file GridFS file document
* @throws CorruptFileException
*/
public function __construct(CollectionWrapper $collectionWrapper, stdClass $file)
{
$this->collectionWrapper = $collectionWrapper;
$this->file = $file;
try {
$cursor = $this->collectionWrapper->getChunksCollection()->find(
['files_id' => $this->file->_id],
['sort' => ['n' => 1]]
);
} catch (Exception $e) {
// TODO: Why do we replace a driver exception with CorruptFileException here?
throw new CorruptFileException();
}
$this->chunksIterator = new \IteratorIterator($cursor);
$this->chunksIterator = $collectionWrapper->getChunksIteratorByFilesId($this->file->_id);
$this->numChunks = ($file->length >= 0) ? ceil($file->length / $file->chunkSize) : 0;
$this->buffer = fopen('php://temp', 'w+');
$this->initEmptyBuffer();
}
public function close()
......@@ -57,32 +45,35 @@ class GridFSDownload
fclose($this->buffer);
}
public function downloadNumBytes($numToRead)
/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $numBytes Number of bytes to read
* @return string
*/
public function downloadNumBytes($numBytes)
{
$output = "";
if ($this->bufferFresh) {
rewind($this->buffer);
$this->bufferFresh = false;
}
// TODO: Should we be checking for fread errors here?
$output = fread($this->buffer, $numToRead);
$output = fread($this->buffer, $numBytes);
if (strlen($output) == $numToRead) {
if (strlen($output) == $numBytes) {
return $output;
}
fclose($this->buffer);
$this->buffer = fopen("php://temp", "w+");
$this->bufferFresh = true;
$this->bufferEmpty = true;
$this->initEmptyBuffer();
$bytesLeft = $numToRead - strlen($output);
$bytesLeft = $numBytes - strlen($output);
while (strlen($output) < $numToRead && $this->advanceChunks()) {
$bytesLeft = $numToRead - strlen($output);
while (strlen($output) < $numBytes && $this->advanceChunks()) {
$bytesLeft = $numBytes - strlen($output);
$output .= substr($this->chunksIterator->current()->data->getData(), 0, $bytesLeft);
}
......@@ -94,8 +85,18 @@ class GridFSDownload
return $output;
}
/**
* Writes the contents of this GridFS file to a writable stream.
*
* @param resource $destination Writable stream
* @throws InvalidArgumentException
*/
public function downloadToStream($destination)
{
if ( ! is_resource($destination) || get_resource_type($destination) != "stream") {
throw InvalidArgumentException::invalidType('$destination', $destination, 'resource');
}
while ($this->advanceChunks()) {
// TODO: Should we be checking for fwrite errors here?
fwrite($destination, $this->chunksIterator->current()->data->getData());
......@@ -160,4 +161,15 @@ class GridFSDownload
return true;
}
private function initEmptyBuffer()
{
if (isset($this->buffer)) {
fclose($this->buffer);
}
$this->buffer = fopen("php://temp", "w+");
$this->bufferEmpty = true;
$this->bufferFresh = true;
}
}
......@@ -11,96 +11,130 @@ namespace MongoDB\GridFS;
*/
class StreamWrapper
{
/**
* @var resource|null Stream context (set by PHP)
*/
public $context;
private $collectionsWrapper;
private $gridFSStream;
private $id;
private $mode;
private $protocol;
private $stream;
public function getId()
{
return $this->id;
}
public function openReadStream()
{
$context = stream_context_get_options($this->context);
$this->gridFSStream = new GridFSDownload($this->collectionWrapper, $context['gridfs']['file']);
$this->id = $this->gridFSStream->getId();
return true;
}
public function openWriteStream()
{
$context = stream_context_get_options($this->context);
$options = $context['gridfs']['uploadOptions'];
$this->gridFSStream = new GridFSUpload($this->collectionWrapper, $this->identifier, $options);
$this->id = $this->gridFSStream->getId();
return true;
return $this->stream->getId();
}
/**
* Register the GridFS stream wrapper.
*
* @param string $protocol Protocol to use for stream_wrapper_register()
*/
public static function register()
public static function register($protocol = 'gridfs')
{
if (in_array('gridfs', stream_get_wrappers())) {
stream_wrapper_unregister('gridfs');
if (in_array($protocol, stream_get_wrappers())) {
stream_wrapper_unregister($protocol);
}
stream_wrapper_register('gridfs', get_called_class(), \STREAM_IS_URL);
stream_wrapper_register($protocol, get_called_class(), \STREAM_IS_URL);
}
/**
* Closes the stream.
*
* @see http://php.net/manual/en/streamwrapper.stream-close.php
*/
public function stream_close()
{
$this->gridFSStream->close();
$this->stream->close();
}
/**
* Returns whether the file pointer is at the end of the stream.
*
* @see http://php.net/manual/en/streamwrapper.stream-eof.php
* @return boolean
*/
public function stream_eof()
{
return $this->gridFSStream->isEOF();
return $this->stream->isEOF();
}
/**
* Opens the stream.
*
* @see http://php.net/manual/en/streamwrapper.stream-open.php
* @param string $path Path to the file resource
* @param string $mode Mode used to open the file (only "r" and "w" are supported)
* @param integer $options Additional flags set by the streams API
* @param string $openedPath Not used
*/
public function stream_open($path, $mode, $options, &$openedPath)
{
$this->initProtocol($path);
$context = stream_context_get_options($this->context);
$this->collectionWrapper = $context['gridfs']['collectionWrapper'];
$this->mode = $mode;
switch ($this->mode) {
case 'r': return $this->openReadStream();
case 'w': return $this->openWriteStream();
default: return false;
if ($mode === 'r') {
return $this->initReadableStream();
}
if ($mode === 'w') {
return $this->initWritableStream();
}
return false;
}
/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @see http://php.net/manual/en/streamwrapper.stream-read.php
* @param integer $count Number of bytes to read
* @return string
*/
public function stream_read($count)
{
return $this->gridFSStream->downloadNumBytes($count);
// TODO: Ensure that $this->stream is a ReadableStream
return $this->stream->downloadNumBytes($count);
}
/**
* Return information about the stream.
*
* @see http://php.net/manual/en/streamwrapper.stream-stat.php
* @return array
*/
public function stream_stat()
{
$stat = $this->getStatTemplate();
$stat[7] = $stat['size'] = $this->gridFSStream->getSize();
$stat[2] = $stat['mode'] = $this->mode;
$stat[7] = $stat['size'] = $this->stream->getSize();
return $stat;
}
/**
* Write bytes to the stream.
*
* @see http://php.net/manual/en/streamwrapper.stream-write.php
* @param string $data Data to write
* @return integer The number of bytes successfully stored
*/
public function stream_write($data)
{
$this->gridFSStream->insertChunks($data);
// TODO: Ensure that $this->stream is a WritableStream
$this->stream->insertChunks($data);
return strlen($data);
}
/**
* Gets a URL stat template with default values
* from https://github.com/aws/aws-sdk-php/blob/master/src/S3/StreamWrapper.php
* Returns a stat template with default values.
*
* @return array
*/
private function getStatTemplate()
......@@ -122,9 +156,52 @@ class StreamWrapper
];
}
/**
* Initialize the protocol from the given path.
*
* @see StreamWrapper::stream_open()
* @param string $path
*/
private function initProtocol($path)
{
$parsed_path = parse_url($path);
$this->identifier = substr($parsed_path['path'], 1);
$parts = explode('://', $path, 2);
$this->protocol = $parts[0] ?: 'gridfs';
}
/**
* Initialize the internal stream for reading.
*
* @see StreamWrapper::stream_open()
* @return boolean
*/
private function initReadableStream()
{
$context = stream_context_get_options($this->context);
$this->stream = new ReadableStream(
$context[$this->protocol]['collectionWrapper'],
$context[$this->protocol]['file']
);
return true;
}
/**
* Initialize the internal stream for writing.
*
* @see StreamWrapper::stream_open()
* @return boolean
*/
private function initWritableStream()
{
$context = stream_context_get_options($this->context);
$this->stream = new WritableStream(
$context[$this->protocol]['collectionWrapper'],
$context[$this->protocol]['filename'],
$context[$this->protocol]['options']
);
return true;
}
}
......@@ -5,16 +5,18 @@ namespace MongoDB\GridFS;
use MongoDB\BSON\Binary;
use MongoDB\BSON\ObjectId;
use MongoDB\BSON\UTCDateTime;
use MongoDB\Driver\Exception\Exception;
use MongoDB\Driver\Exception\Exception as DriverException;
use MongoDB\Exception\InvalidArgumentException;
/**
* GridFSUpload abstracts the process of writing a GridFS file.
* WritableStream abstracts the process of writing a GridFS file.
*
* @internal
*/
class GridFSUpload
class WritableStream
{
private static $defaultChunkSizeBytes = 261120;
private $buffer;
private $bufferLength = 0;
private $chunkOffset = 0;
......@@ -22,15 +24,16 @@ class GridFSUpload
private $collectionWrapper;
private $ctx;
private $file;
private $indexChecker;
private $isClosed = false;
private $length = 0;
/**
* Constructs a GridFS upload stream.
* Constructs a writable GridFS stream.
*
* Supported options:
*
* * _id (mixed): File document identifier. Defaults to a new ObjectId.
*
* * aliases (array of strings): DEPRECATED An array of aliases.
* Applications wishing to store aliases should add an aliases field to
* the metadata document instead.
......@@ -51,12 +54,19 @@ class GridFSUpload
*/
public function __construct(CollectionWrapper $collectionWrapper, $filename, array $options = [])
{
$options += ['chunkSizeBytes' => 261120];
$options += [
'_id' => new ObjectId,
'chunkSizeBytes' => self::$defaultChunkSizeBytes,
];
if (isset($options['aliases']) && ! \MongoDB\is_string_array($options['aliases'])) {
throw InvalidArgumentException::invalidType('"aliases" option', $options['aliases'], 'array of strings');
}
if (isset($options['chunkSizeBytes']) && ! is_integer($options['chunkSizeBytes'])) {
throw InvalidArgumentException::invalidType('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer');
}
if (isset($options['contentType']) && ! is_string($options['contentType'])) {
throw InvalidArgumentException::invalidType('"contentType" option', $options['contentType'], 'string');
}
......@@ -71,10 +81,11 @@ class GridFSUpload
$this->ctx = hash_init('md5');
$this->file = [
'_id' => new ObjectId(),
'_id' => $options['_id'],
'chunkSize' => $this->chunkSize,
'filename' => (string) $filename,
'uploadDate' => $this->createUploadDate(),
// TODO: This is necessary until PHPC-536 is implemented
'uploadDate' => new UTCDateTime(floor(microtime(true) * 1000)),
] + array_intersect_key($options, ['aliases' => 1, 'contentType' => 1, 'metadata' => 1]);
}
......@@ -133,7 +144,7 @@ class GridFSUpload
* reset.
*
* @param string $toWrite Binary data to write
* @return int
* @return integer
*/
public function insertChunks($toWrite)
{
......@@ -171,6 +182,7 @@ class GridFSUpload
*
* @param resource $source Readable stream
* @return ObjectId
* @throws InvalidArgumentException
*/
public function uploadFromStream($source)
{
......@@ -178,8 +190,6 @@ class GridFSUpload
throw InvalidArgumentException::invalidType('$source', $source, 'resource');
}
$streamMetadata = stream_get_meta_data($source);
while ($data = $this->readChunk($source)) {
$this->insertChunk($data);
}
......@@ -189,20 +199,10 @@ class GridFSUpload
private function abort()
{
$this->collectionWrapper->getChunksCollection()->deleteMany(['files_id' => $this->file['_id']]);
$this->collectionWrapper->getFilesCollection()->deleteOne(['_id' => $this->file['_id']]);
$this->collectionWrapper->deleteChunksByFilesId($this->file['_id']);
$this->isClosed = true;
}
// From: http://stackoverflow.com/questions/3656713/how-to-get-current-time-in-milliseconds-in-php
private function createUploadDate()
{
$parts = explode(' ', microtime());
$milliseconds = sprintf('%d%03d', $parts[1], $parts[0] * 1000);
return new UTCDateTime($milliseconds);
}
private function fileCollectionInsert()
{
if ($this->isClosed) {
......@@ -244,7 +244,7 @@ class GridFSUpload
{
try {
$data = fread($source, $this->chunkSize);
} catch (Exception $e) {
} catch (DriverException $e) {
$this->abort();
throw $e;
}
......
......@@ -63,8 +63,8 @@ class BucketFunctionalTest extends FunctionalTestCase
$id = $this->bucket->uploadFromStream("test_filename", $this->generateStream("hello world"));
$contents = stream_get_contents($this->bucket->openDownloadStream($id));
$this->assertEquals("hello world", $contents);
$this->assertEquals(1, $this->bucket->getCollectionsWrapper()->getFilesCollection()->count());
$this->assertEquals(1, $this->bucket->getCollectionsWrapper()->getChunksCollection()->count());
$this->assertEquals(1, $this->bucket->getCollectionWrapper()->getFilesCollection()->count());
$this->assertEquals(1, $this->bucket->getCollectionWrapper()->getChunksCollection()->count());
$this->bucket->delete($id);
$error=null;
......@@ -75,17 +75,17 @@ class BucketFunctionalTest extends FunctionalTestCase
}
$fileNotFound = '\MongoDB\GridFS\Exception\FileNotFoundException';
$this->assertTrue($error instanceof $fileNotFound);
$this->assertEquals(0, $this->bucket->getCollectionsWrapper()->getFilesCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionsWrapper()->getChunksCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionWrapper()->getFilesCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionWrapper()->getChunksCollection()->count());
}
public function testMultiChunkDelete()
{
$id = $this->bucket->uploadFromStream("test_filename", $this->generateStream("hello"), ['chunkSizeBytes'=>1]);
$this->assertEquals(1, $this->bucket->getCollectionsWrapper()->getFilesCollection()->count());
$this->assertEquals(5, $this->bucket->getCollectionsWrapper()->getChunksCollection()->count());
$this->assertEquals(1, $this->bucket->getCollectionWrapper()->getFilesCollection()->count());
$this->assertEquals(5, $this->bucket->getCollectionWrapper()->getChunksCollection()->count());
$this->bucket->delete($id);
$this->assertEquals(0, $this->bucket->getCollectionsWrapper()->getFilesCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionsWrapper()->getChunksCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionWrapper()->getFilesCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionWrapper()->getChunksCollection()->count());
}
public function testEmptyFile()
......@@ -93,10 +93,10 @@ class BucketFunctionalTest extends FunctionalTestCase
$id = $this->bucket->uploadFromStream("test_filename",$this->generateStream(""));
$contents = stream_get_contents($this->bucket->openDownloadStream($id));
$this->assertEquals("", $contents);
$this->assertEquals(1, $this->bucket->getCollectionsWrapper()->getFilesCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionsWrapper()->getChunksCollection()->count());
$this->assertEquals(1, $this->bucket->getCollectionWrapper()->getFilesCollection()->count());
$this->assertEquals(0, $this->bucket->getCollectionWrapper()->getChunksCollection()->count());
$raw = $this->bucket->getCollectionsWrapper()->getFilesCollection()->findOne();
$raw = $this->bucket->getCollectionWrapper()->getFilesCollection()->findOne();
$this->assertEquals(0, $raw->length);
$this->assertEquals($id, $raw->_id);
$this->assertTrue($raw->uploadDate instanceof \MongoDB\BSON\UTCDateTime);
......@@ -107,7 +107,7 @@ class BucketFunctionalTest extends FunctionalTestCase
{
$id = $this->bucket->uploadFromStream("test_filename", $this->generateStream("foobar"));
$this->collectionsWrapper->getChunksCollection()->updateOne(['files_id' => $id],
$this->collectionWrapper->getChunksCollection()->updateOne(['files_id' => $id],
['$set' => ['data' => new \MongoDB\BSON\Binary('foo', \MongoDB\BSON\Binary::TYPE_GENERIC)]]);
$error = null;
try{
......@@ -123,7 +123,7 @@ class BucketFunctionalTest extends FunctionalTestCase
{
$id = $this->bucket->uploadFromStream("test_filename", $this->generateStream("hello world,abcdefghijklmnopqrstuv123456789"), ["chunkSizeBytes" => 1]);
$this->collectionsWrapper->getChunksCollection()->deleteOne(['files_id' => $id, 'n' => 7]);
$this->collectionWrapper->getChunksCollection()->deleteOne(['files_id' => $id, 'n' => 7]);
$error = null;
try{
$download = $this->bucket->openDownloadStream($id);
......@@ -136,8 +136,8 @@ class BucketFunctionalTest extends FunctionalTestCase
}
public function testUploadEnsureIndexes()
{
$chunks = $this->bucket->getCollectionsWrapper()->getChunksCollection();
$files = $this->bucket->getCollectionsWrapper()->getFilesCollection();
$chunks = $this->bucket->getCollectionWrapper()->getChunksCollection();
$files = $this->bucket->getCollectionWrapper()->getFilesCollection();
$this->bucket->uploadFromStream("filename", $this->generateStream("junk"));
$chunksIndexed = false;
......@@ -238,7 +238,7 @@ class BucketFunctionalTest extends FunctionalTestCase
public function testGridInNonIntChunksize()
{
$id = $this->bucket->uploadFromStream("f",$this->generateStream("data"));
$this->bucket->getCollectionsWrapper()->getFilesCollection()->updateOne(["filename"=>"f"],
$this->bucket->getCollectionWrapper()->getFilesCollection()->updateOne(["filename"=>"f"],
['$set'=> ['chunkSize' => 100.00]]);
$this->assertEquals("data", stream_get_contents($this->bucket->openDownloadStream($id)));
}
......@@ -288,7 +288,7 @@ class BucketFunctionalTest extends FunctionalTestCase
$id = $this->bucket->uploadFromStream("test_filename", $this->generateStream("hello world"));
$this->bucket->drop();
$id = $this->bucket->uploadFromStream("test_filename", $this->generateStream("hello world"));
$this->assertEquals(1, $this->collectionsWrapper->getFilesCollection()->count());
$this->assertEquals(1, $this->collectionWrapper->getFilesCollection()->count());
}
/**
*@dataProvider provideInsertChunks
......
......@@ -12,7 +12,7 @@ use MongoDB\Tests\FunctionalTestCase as BaseFunctionalTestCase;
abstract class FunctionalTestCase extends BaseFunctionalTestCase
{
protected $bucket;
protected $collectionsWrapper;
protected $collectionWrapper;
public function setUp()
{
......@@ -22,7 +22,7 @@ abstract class FunctionalTestCase extends BaseFunctionalTestCase
$col->drop();
}
$this->bucket = new \MongoDB\GridFS\Bucket($this->manager, $this->getDatabaseName());
$this->collectionsWrapper = $this->bucket->getCollectionsWrapper();
$this->collectionWrapper = $this->bucket->getCollectionWrapper();
}
public function tearDown()
......
......@@ -12,17 +12,17 @@ class GridFSStreamTest extends FunctionalTestCase
public function testBasic()
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$upload->insertChunks("hello world");
$id = $upload->getId();
$upload->close();
$this->assertEquals(1, $this->collectionsWrapper->getFilesCollection()->count());
$this->assertEquals(1, $this->collectionsWrapper->getChunksCollection()->count());
$this->assertEquals(1, $this->collectionWrapper->getFilesCollection()->count());
$this->assertEquals(1, $this->collectionWrapper->getChunksCollection()->count());
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id"=>$id], ['typeMap' => ['root' => 'stdClass']]);
$file = $this->collectionWrapper->findFileById($id);
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$stream = fopen('php://temp', 'w+');
$download->downloadToStream($stream);
rewind($stream);
......@@ -31,7 +31,7 @@ class GridFSStreamTest extends FunctionalTestCase
fclose($stream);
#make sure it's still there!
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$stream = fopen('php://temp', 'w+');
$download->downloadToStream($stream);
rewind($stream);
......@@ -39,15 +39,15 @@ class GridFSStreamTest extends FunctionalTestCase
$this->assertEquals("hello world", $contents);
fclose($stream);
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$id = $upload->getId();
$upload->close();
$this->assertEquals(2, $this->collectionsWrapper->getFilesCollection()->count());
$this->assertEquals(1, $this->collectionsWrapper->getChunksCollection()->count());
$this->assertEquals(2, $this->collectionWrapper->getFilesCollection()->count());
$this->assertEquals(1, $this->collectionWrapper->getChunksCollection()->count());
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id"=>$id], ['typeMap' => ['root' => 'stdClass']]);
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$file = $this->collectionWrapper->findFileById($id);
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$stream = fopen('php://temp', 'w+');
$download->downloadToStream($stream);
rewind($stream);
......@@ -58,17 +58,17 @@ class GridFSStreamTest extends FunctionalTestCase
public function testMd5()
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$upload->insertChunks("hello world\n");
$id = $upload->getId();
$upload->close();
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id"=>$id]);
$file = $this->collectionWrapper->findFileById($id);
$this->assertEquals("6f5902ac237024bdd0c176cb93063dc4", $file->md5);
}
public function testUploadDefaultOpts()
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$this->assertTrue($upload->getId() instanceof \MongoDB\BSON\ObjectId);
$this->assertTrue($upload->getFile()["uploadDate"] instanceof \MongoDB\BSON\UTCDateTime);
......@@ -89,7 +89,7 @@ class GridFSStreamTest extends FunctionalTestCase
"aliases" => ["foo", "bar"],
"metadata" => ["foo" => 1, "bar" => 2]
];
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test", $options);
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test", $options);
$this->assertEquals($upload->getChunkSize(), 1);
$this->assertEquals($upload->getFile()["contentType"], "text/html");
$this->assertEquals($upload->getFile()["aliases"], ["foo", "bar"]);
......@@ -97,11 +97,11 @@ class GridFSStreamTest extends FunctionalTestCase
}
public function testDownloadDefaultOpts()
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$upload->close();
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id" => $upload->getId()], ['typeMap' => ['root' => 'stdClass']]);
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$file = $this->collectionWrapper->findFileById($upload->getId());
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$download->close();
$this->assertEquals($upload->getId(), $download->getId());
......@@ -120,12 +120,12 @@ class GridFSStreamTest extends FunctionalTestCase
"aliases" => ["foo", "bar"],
"metadata" => ["foo" => 1, "bar" => 2]
];
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test", $options);
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test", $options);
$upload->insertChunks("hello world");
$upload->close();
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id" => $upload->getId()], ['typeMap' => ['root' => 'stdClass']]);
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$file = $this->collectionWrapper->findFileById($upload->getId());
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$this->assertEquals("test", $download->getFile()->filename);
$this->assertEquals($upload->getId(), $download->getId());
......@@ -141,7 +141,7 @@ class GridFSStreamTest extends FunctionalTestCase
*/
public function testInsertChunks($data)
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$upload->insertChunks($data);
$upload->close();
$stream = $this->bucket->openDownloadStream($upload->getId());
......@@ -154,12 +154,12 @@ class GridFSStreamTest extends FunctionalTestCase
for($i=0; $i<255*1024+1000; $i++){
$toUpload .= "a";
}
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test");
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test");
$upload->insertChunks($toUpload);
$upload->close();
$this->assertEquals(1, $this->collectionsWrapper->getFilesCollection()->count());
$this->assertEquals(2, $this->collectionsWrapper->getChunksCollection()->count());
$this->assertEquals(1, $this->collectionWrapper->getFilesCollection()->count());
$this->assertEquals(2, $this->collectionWrapper->getChunksCollection()->count());
$download = $this->bucket->openDownloadStream($upload->getId());
$this->assertEquals($toUpload, stream_get_contents($download));
......@@ -170,23 +170,23 @@ class GridFSStreamTest extends FunctionalTestCase
public function testSmallChunks($data)
{
$options = ["chunkSizeBytes"=>1];
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test", $options);
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test", $options);
$upload->insertChunks($data);
$upload->close();
$this->assertEquals(strlen($data), $this->collectionsWrapper->getChunksCollection()->count());
$this->assertEquals(1, $this->collectionsWrapper->getFilesCollection()->count());
$this->assertEquals(strlen($data), $this->collectionWrapper->getChunksCollection()->count());
$this->assertEquals(1, $this->collectionWrapper->getFilesCollection()->count());
$stream = $this->bucket->openDownloadStream($upload->getId());
$this->assertEquals($data, stream_get_contents($stream));
}
public function testMultipleReads()
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test", ["chunkSizeBytes"=>3]);
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test", ["chunkSizeBytes"=>3]);
$upload->insertChunks("hello world");
$upload->close();
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id"=>$upload->getId()], ['typeMap' => ['root' => 'stdClass']]);
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$file = $this->collectionWrapper->findFileById($upload->getId());
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$this->assertEquals("he", $download->downloadNumBytes(2));
$this->assertEquals("ll", $download->downloadNumBytes(2));
$this->assertEquals("o ", $download->downloadNumBytes(2));
......@@ -202,11 +202,11 @@ class GridFSStreamTest extends FunctionalTestCase
*/
public function testProvidedMultipleReads($data)
{
$upload = new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper, "test", ["chunkSizeBytes"=>rand(1, 5)]);
$upload = new \MongoDB\GridFS\WritableStream($this->collectionWrapper, "test", ["chunkSizeBytes"=>rand(1, 5)]);
$upload->insertChunks($data);
$upload->close();
$file = $this->collectionsWrapper->getFilesCollection()->findOne(["_id"=>$upload->getId()], ['typeMap' => ['root' => 'stdClass']]);
$download = new \MongoDB\GridFS\GridFSDownload($this->collectionsWrapper, $file);
$file = $this->collectionWrapper->findFileById($upload->getId());
$download = new \MongoDB\GridFS\ReadableStream($this->collectionWrapper, $file);
$readPos = 0;
while($readPos < strlen($data)){
......@@ -227,7 +227,7 @@ class GridFSStreamTest extends FunctionalTestCase
*/
public function testUploadConstructorOptionTypeChecks(array $options)
{
new \MongoDB\GridFS\GridFSUpload($this->collectionsWrapper,"test", $options);
new \MongoDB\GridFS\WritableStream($this->collectionWrapper,"test", $options);
}
public function provideInvalidUploadConstructorOptions()
......
......@@ -78,9 +78,9 @@ class SpecificationTests extends FunctionalTestCase
$fixedAssertTrue = $this->fixTypes($test['assert'], true);
if (isset($test['assert']['data'])) {
$this->runCommands($fixedAssertTrue['data'], $result);
$this->collectionsEqual($this->collections['expected.files'],$this->bucket->getCollectionsWrapper()->getFilesCollection());
$this->collectionsEqual($this->collections['expected.files'],$this->bucket->getCollectionWrapper()->getFilesCollection());
if(isset($this->collections['expected.chunks'])) {
$this->collectionsEqual($this->collections['expected.chunks'],$this->bucket->getCollectionsWrapper()->getChunksCollection());
$this->collectionsEqual($this->collections['expected.chunks'],$this->bucket->getCollectionWrapper()->getChunksCollection());
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment