Commit b3a8b00d authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-247: Use strings instead of memory stream for GridFS download buffering

parent dccf20aa
...@@ -22,7 +22,6 @@ use MongoDB\UpdateResult; ...@@ -22,7 +22,6 @@ use MongoDB\UpdateResult;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager; use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference; use MongoDB\Driver\ReadPreference;
use IteratorIterator;
use stdClass; use stdClass;
/** /**
...@@ -87,6 +86,27 @@ class CollectionWrapper ...@@ -87,6 +86,27 @@ class CollectionWrapper
$this->chunksCollection->drop(['typeMap' => []]); $this->chunksCollection->drop(['typeMap' => []]);
} }
/**
* Finds GridFS chunk documents for a given file ID and optional offset.
*
* @param mixed $id File ID
* @param integer $fromChunk Starting chunk (inclusive)
* @return Cursor
*/
public function findChunksByFileId($id, $fromChunk = 0)
{
return $this->chunksCollection->find(
[
'files_id' => $id,
'n' => ['$gte' => $fromChunk],
],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);
}
/** /**
* Finds a GridFS file document for a given filename and revision. * Finds a GridFS file document for a given filename and revision.
* *
...@@ -177,25 +197,6 @@ class CollectionWrapper ...@@ -177,25 +197,6 @@ class CollectionWrapper
return $this->bucketName; return $this->bucketName;
} }
/**
* Returns a chunks iterator for a given file ID.
*
* @param mixed $id
* @return IteratorIterator
*/
public function getChunksIteratorByFilesId($id)
{
$cursor = $this->chunksCollection->find(
['files_id' => $id],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);
return new IteratorIterator($cursor);
}
/** /**
* Return the database name. * Return the database name.
* *
......
...@@ -19,6 +19,7 @@ namespace MongoDB\GridFS; ...@@ -19,6 +19,7 @@ namespace MongoDB\GridFS;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\GridFS\Exception\CorruptFileException; use MongoDB\GridFS\Exception\CorruptFileException;
use IteratorIterator;
use stdClass; use stdClass;
/** /**
...@@ -29,18 +30,15 @@ use stdClass; ...@@ -29,18 +30,15 @@ use stdClass;
class ReadableStream class ReadableStream
{ {
private $buffer; private $buffer;
private $bufferEmpty; private $bufferOffset = 0;
private $bufferFresh;
private $bytesSeen = 0;
private $chunkSize; private $chunkSize;
private $chunkOffset = 0; private $chunkOffset = 0;
private $chunksIterator; private $chunksIterator;
private $collectionWrapper; private $collectionWrapper;
private $expectedLastChunkSize = 0;
private $file; private $file;
private $firstCheck = true;
private $iteratorEmpty = false;
private $length; private $length;
private $numChunks; private $numChunks = 0;
/** /**
* Constructs a readable GridFS stream. * Constructs a readable GridFS stream.
...@@ -64,13 +62,15 @@ class ReadableStream ...@@ -64,13 +62,15 @@ class ReadableStream
} }
$this->file = $file; $this->file = $file;
$this->chunkSize = $file->chunkSize; $this->chunkSize = (integer) $file->chunkSize;
$this->length = $file->length; $this->length = (integer) $file->length;
$this->chunksIterator = $collectionWrapper->getChunksIteratorByFilesId($file->_id);
$this->collectionWrapper = $collectionWrapper; $this->collectionWrapper = $collectionWrapper;
$this->numChunks = ceil($this->length / $this->chunkSize);
$this->initEmptyBuffer(); if ($this->length > 0) {
$this->numChunks = (integer) ceil($this->length / $this->chunkSize);
$this->expectedLastChunkSize = ($this->length - (($this->numChunks - 1) * $this->chunkSize));
}
} }
/** /**
...@@ -90,56 +90,7 @@ class ReadableStream ...@@ -90,56 +90,7 @@ class ReadableStream
public function close() public function close()
{ {
fclose($this->buffer); // Nothing to do
}
/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $numBytes Number of bytes to read
* @return string
* @throws InvalidArgumentException if $numBytes is negative
*/
public function downloadNumBytes($numBytes)
{
if ($numBytes < 0) {
throw new InvalidArgumentException(sprintf('$numBytes must be >= zero; given: %d', $numBytes));
}
if ($numBytes == 0) {
return '';
}
if ($this->bufferFresh) {
rewind($this->buffer);
$this->bufferFresh = false;
}
// TODO: Should we be checking for fread errors here?
$output = fread($this->buffer, $numBytes);
if (strlen($output) == $numBytes) {
return $output;
}
$this->initEmptyBuffer();
$bytesLeft = $numBytes - strlen($output);
while (strlen($output) < $numBytes && $this->advanceChunks()) {
$bytesLeft = $numBytes - strlen($output);
$output .= substr($this->chunksIterator->current()->data->getData(), 0, $bytesLeft);
}
if ( ! $this->iteratorEmpty && $this->length > 0 && $bytesLeft < strlen($this->chunksIterator->current()->data->getData())) {
fwrite($this->buffer, substr($this->chunksIterator->current()->data->getData(), $bytesLeft));
$this->bufferEmpty = false;
}
return $output;
} }
/** /**
...@@ -162,58 +113,123 @@ class ReadableStream ...@@ -162,58 +113,123 @@ class ReadableStream
return $this->length; return $this->length;
} }
/**
* Return whether the current read position is at the end of the stream.
*
* @return boolean
*/
public function isEOF() public function isEOF()
{ {
return ($this->iteratorEmpty && $this->bufferEmpty); if ($this->chunkOffset === $this->numChunks - 1) {
return $this->bufferOffset >= $this->expectedLastChunkSize;
}
return $this->chunkOffset >= $this->numChunks;
} }
private function advanceChunks() /**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $length Number of bytes to read
* @return string
* @throws InvalidArgumentException if $length is negative
*/
public function readBytes($length)
{ {
if ($this->chunkOffset >= $this->numChunks) { if ($length < 0) {
$this->iteratorEmpty = true; throw new InvalidArgumentException(sprintf('$length must be >= 0; given: %d', $length));
}
return false; if ($this->chunksIterator === null) {
$this->initChunksIterator();
}
if ($this->buffer === null && ! $this->initBufferFromCurrentChunk()) {
return '';
} }
if ($this->firstCheck) { $data = '';
$this->chunksIterator->rewind();
$this->firstCheck = false; while (strlen($data) < $length) {
} else { if ($this->bufferOffset >= strlen($this->buffer) && ! $this->initBufferFromNextChunk()) {
$this->chunksIterator->next(); break;
}
$initialDataLength = strlen($data);
$data .= substr($this->buffer, $this->bufferOffset, $length - $initialDataLength);
$this->bufferOffset += strlen($data) - $initialDataLength;
}
return $data;
}
/**
* Initialize the buffer to the current chunk's data.
*
* @return boolean Whether there was a current chunk to read
* @throws CorruptFileException if an expected chunk could not be read successfully
*/
private function initBufferFromCurrentChunk()
{
if ($this->chunkOffset === 0 && $this->numChunks === 0) {
return false;
} }
if ( ! $this->chunksIterator->valid()) { if ( ! $this->chunksIterator->valid()) {
throw CorruptFileException::missingChunk($this->chunkOffset); throw CorruptFileException::missingChunk($this->chunkOffset);
} }
if ($this->chunksIterator->current()->n != $this->chunkOffset) { $currentChunk = $this->chunksIterator->current();
throw CorruptFileException::unexpectedIndex($this->chunksIterator->current()->n, $this->chunkOffset);
if ($currentChunk->n !== $this->chunkOffset) {
throw CorruptFileException::unexpectedIndex($currentChunk->n, $this->chunkOffset);
} }
$actualChunkSize = strlen($this->chunksIterator->current()->data->getData()); $this->buffer = $currentChunk->data->getData();
$expectedChunkSize = ($this->chunkOffset == $this->numChunks - 1) $actualChunkSize = strlen($this->buffer);
? ($this->length - $this->bytesSeen)
$expectedChunkSize = ($this->chunkOffset === $this->numChunks - 1)
? $this->expectedLastChunkSize
: $this->chunkSize; : $this->chunkSize;
if ($actualChunkSize != $expectedChunkSize) { if ($actualChunkSize !== $expectedChunkSize) {
throw CorruptFileException::unexpectedSize($actualChunkSize, $expectedChunkSize); throw CorruptFileException::unexpectedSize($actualChunkSize, $expectedChunkSize);
} }
$this->bytesSeen += $actualChunkSize;
$this->chunkOffset++;
return true; return true;
} }
private function initEmptyBuffer() /**
* Advance to the next chunk and initialize the buffer to its data.
*
* @return boolean Whether there was a next chunk to read
* @throws CorruptFileException if an expected chunk could not be read successfully
*/
private function initBufferFromNextChunk()
{ {
if (isset($this->buffer)) { if ($this->chunkOffset === $this->numChunks - 1) {
fclose($this->buffer); return false;
} }
$this->buffer = fopen("php://memory", "w+b"); $this->bufferOffset = 0;
$this->bufferEmpty = true; $this->chunkOffset++;
$this->bufferFresh = true; $this->chunksIterator->next();
return $this->initBufferFromCurrentChunk();
}
/**
* Initializes the chunk iterator starting from the current offset.
*/
private function initChunksIterator()
{
$cursor = $this->collectionWrapper->findChunksByFileId($this->file->_id, $this->chunkOffset);
$this->chunksIterator = new IteratorIterator($cursor);
$this->chunksIterator->rewind();
} }
} }
...@@ -119,17 +119,17 @@ class StreamWrapper ...@@ -119,17 +119,17 @@ class StreamWrapper
* if data is not available to be read. * if data is not available to be read.
* *
* @see http://php.net/manual/en/streamwrapper.stream-read.php * @see http://php.net/manual/en/streamwrapper.stream-read.php
* @param integer $count Number of bytes to read * @param integer $length Number of bytes to read
* @return string * @return string
*/ */
public function stream_read($count) public function stream_read($length)
{ {
if ( ! $this->stream instanceof ReadableStream) { if ( ! $this->stream instanceof ReadableStream) {
return ''; return '';
} }
try { try {
return $this->stream->downloadNumBytes($count); return $this->stream->readBytes($length);
} catch (Exception $e) { } catch (Exception $e) {
trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING); trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING);
return false; return false;
......
...@@ -74,12 +74,12 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -74,12 +74,12 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
/** /**
* @dataProvider provideFileIdAndExpectedBytes * @dataProvider provideFileIdAndExpectedBytes
*/ */
public function testDownloadNumBytes($fileId, $numBytes, $expectedBytes) public function testReadBytes($fileId, $length, $expectedBytes)
{ {
$fileDocument = $this->collectionWrapper->findFileById($fileId); $fileDocument = $this->collectionWrapper->findFileById($fileId);
$stream = new ReadableStream($this->collectionWrapper, $fileDocument); $stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$this->assertSame($expectedBytes, $stream->downloadNumBytes($numBytes)); $this->assertSame($expectedBytes, $stream->readBytes($length));
} }
public function provideFileIdAndExpectedBytes() public function provideFileIdAndExpectedBytes()
...@@ -111,14 +111,14 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -111,14 +111,14 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
/** /**
* @dataProvider provideFileIdAndExpectedBytes * @dataProvider provideFileIdAndExpectedBytes
*/ */
public function testDownloadNumBytesCalledMultipleTimes($fileId, $numBytes, $expectedBytes) public function testReadBytesCalledMultipleTimes($fileId, $length, $expectedBytes)
{ {
$fileDocument = $this->collectionWrapper->findFileById($fileId); $fileDocument = $this->collectionWrapper->findFileById($fileId);
$stream = new ReadableStream($this->collectionWrapper, $fileDocument); $stream = new ReadableStream($this->collectionWrapper, $fileDocument);
for ($i = 0; $i < $numBytes; $i++) { for ($i = 0; $i < $length; $i++) {
$expectedByte = isset($expectedBytes[$i]) ? $expectedBytes[$i] : ''; $expectedByte = isset($expectedBytes[$i]) ? $expectedBytes[$i] : '';
$this->assertSame($expectedByte, $stream->downloadNumBytes(1)); $this->assertSame($expectedByte, $stream->readBytes(1));
} }
} }
...@@ -126,35 +126,35 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -126,35 +126,35 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
* @expectedException MongoDB\GridFS\Exception\CorruptFileException * @expectedException MongoDB\GridFS\Exception\CorruptFileException
* @expectedExceptionMessage Chunk not found for index "2" * @expectedExceptionMessage Chunk not found for index "2"
*/ */
public function testDownloadNumBytesWithMissingChunk() public function testReadBytesWithMissingChunk()
{ {
$this->chunksCollection->deleteOne(['files_id' => 'length-10', 'n' => 2]); $this->chunksCollection->deleteOne(['files_id' => 'length-10', 'n' => 2]);
$fileDocument = $this->collectionWrapper->findFileById('length-10'); $fileDocument = $this->collectionWrapper->findFileById('length-10');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument); $stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(10); $stream->readBytes(10);
} }
/** /**
* @expectedException MongoDB\GridFS\Exception\CorruptFileException * @expectedException MongoDB\GridFS\Exception\CorruptFileException
* @expectedExceptionMessage Expected chunk to have index "1" but found "2" * @expectedExceptionMessage Expected chunk to have index "1" but found "2"
*/ */
public function testDownloadNumBytesWithUnexpectedChunkIndex() public function testReadBytesWithUnexpectedChunkIndex()
{ {
$this->chunksCollection->deleteOne(['files_id' => 'length-10', 'n' => 1]); $this->chunksCollection->deleteOne(['files_id' => 'length-10', 'n' => 1]);
$fileDocument = $this->collectionWrapper->findFileById('length-10'); $fileDocument = $this->collectionWrapper->findFileById('length-10');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument); $stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(10); $stream->readBytes(10);
} }
/** /**
* @expectedException MongoDB\GridFS\Exception\CorruptFileException * @expectedException MongoDB\GridFS\Exception\CorruptFileException
* @expectedExceptionMessage Expected chunk to have size "2" but found "1" * @expectedExceptionMessage Expected chunk to have size "2" but found "1"
*/ */
public function testDownloadNumBytesWithUnexpectedChunkSize() public function testReadBytesWithUnexpectedChunkSize()
{ {
$this->chunksCollection->updateOne( $this->chunksCollection->updateOne(
['files_id' => 'length-10', 'n' => 2], ['files_id' => 'length-10', 'n' => 2],
...@@ -164,17 +164,17 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -164,17 +164,17 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
$fileDocument = $this->collectionWrapper->findFileById('length-10'); $fileDocument = $this->collectionWrapper->findFileById('length-10');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument); $stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(10); $stream->readBytes(10);
} }
/** /**
* @expectedException MongoDB\Exception\InvalidArgumentException * @expectedException MongoDB\Exception\InvalidArgumentException
*/ */
public function testDownloadNumBytesWithNegativeReadSize() public function testReadBytesWithNegativeLength()
{ {
$fileDocument = $this->collectionWrapper->findFileById('length-0'); $fileDocument = $this->collectionWrapper->findFileById('length-0');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument); $stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(-1); $stream->readBytes(-1);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment