Commit 0b1e0f3b authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #323

parents dccf20aa b3a8b00d
......@@ -22,7 +22,6 @@ use MongoDB\UpdateResult;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use IteratorIterator;
use stdClass;
/**
......@@ -87,6 +86,27 @@ class CollectionWrapper
$this->chunksCollection->drop(['typeMap' => []]);
}
/**
* Finds GridFS chunk documents for a given file ID and optional offset.
*
* @param mixed $id File ID
* @param integer $fromChunk Starting chunk (inclusive)
* @return Cursor
*/
public function findChunksByFileId($id, $fromChunk = 0)
{
return $this->chunksCollection->find(
[
'files_id' => $id,
'n' => ['$gte' => $fromChunk],
],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);
}
/**
* Finds a GridFS file document for a given filename and revision.
*
......@@ -177,25 +197,6 @@ class CollectionWrapper
return $this->bucketName;
}
/**
* Returns a chunks iterator for a given file ID.
*
* @param mixed $id
* @return IteratorIterator
*/
public function getChunksIteratorByFilesId($id)
{
$cursor = $this->chunksCollection->find(
['files_id' => $id],
[
'sort' => ['n' => 1],
'typeMap' => ['root' => 'stdClass'],
]
);
return new IteratorIterator($cursor);
}
/**
* Return the database name.
*
......
......@@ -19,6 +19,7 @@ namespace MongoDB\GridFS;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\GridFS\Exception\CorruptFileException;
use IteratorIterator;
use stdClass;
/**
......@@ -29,18 +30,15 @@ use stdClass;
class ReadableStream
{
private $buffer;
private $bufferEmpty;
private $bufferFresh;
private $bytesSeen = 0;
private $bufferOffset = 0;
private $chunkSize;
private $chunkOffset = 0;
private $chunksIterator;
private $collectionWrapper;
private $expectedLastChunkSize = 0;
private $file;
private $firstCheck = true;
private $iteratorEmpty = false;
private $length;
private $numChunks;
private $numChunks = 0;
/**
* Constructs a readable GridFS stream.
......@@ -64,13 +62,15 @@ class ReadableStream
}
$this->file = $file;
$this->chunkSize = $file->chunkSize;
$this->length = $file->length;
$this->chunkSize = (integer) $file->chunkSize;
$this->length = (integer) $file->length;
$this->chunksIterator = $collectionWrapper->getChunksIteratorByFilesId($file->_id);
$this->collectionWrapper = $collectionWrapper;
$this->numChunks = ceil($this->length / $this->chunkSize);
$this->initEmptyBuffer();
if ($this->length > 0) {
$this->numChunks = (integer) ceil($this->length / $this->chunkSize);
$this->expectedLastChunkSize = ($this->length - (($this->numChunks - 1) * $this->chunkSize));
}
}
/**
......@@ -90,56 +90,7 @@ class ReadableStream
public function close()
{
fclose($this->buffer);
}
/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $numBytes Number of bytes to read
* @return string
* @throws InvalidArgumentException if $numBytes is negative
*/
public function downloadNumBytes($numBytes)
{
if ($numBytes < 0) {
throw new InvalidArgumentException(sprintf('$numBytes must be >= zero; given: %d', $numBytes));
}
if ($numBytes == 0) {
return '';
}
if ($this->bufferFresh) {
rewind($this->buffer);
$this->bufferFresh = false;
}
// TODO: Should we be checking for fread errors here?
$output = fread($this->buffer, $numBytes);
if (strlen($output) == $numBytes) {
return $output;
}
$this->initEmptyBuffer();
$bytesLeft = $numBytes - strlen($output);
while (strlen($output) < $numBytes && $this->advanceChunks()) {
$bytesLeft = $numBytes - strlen($output);
$output .= substr($this->chunksIterator->current()->data->getData(), 0, $bytesLeft);
}
if ( ! $this->iteratorEmpty && $this->length > 0 && $bytesLeft < strlen($this->chunksIterator->current()->data->getData())) {
fwrite($this->buffer, substr($this->chunksIterator->current()->data->getData(), $bytesLeft));
$this->bufferEmpty = false;
}
return $output;
// Nothing to do
}
/**
......@@ -162,58 +113,123 @@ class ReadableStream
return $this->length;
}
/**
* Return whether the current read position is at the end of the stream.
*
* @return boolean
*/
public function isEOF()
{
return ($this->iteratorEmpty && $this->bufferEmpty);
if ($this->chunkOffset === $this->numChunks - 1) {
return $this->bufferOffset >= $this->expectedLastChunkSize;
}
return $this->chunkOffset >= $this->numChunks;
}
private function advanceChunks()
/**
* Read bytes from the stream.
*
* Note: this method may return a string smaller than the requested length
* if data is not available to be read.
*
* @param integer $length Number of bytes to read
* @return string
* @throws InvalidArgumentException if $length is negative
*/
public function readBytes($length)
{
if ($this->chunkOffset >= $this->numChunks) {
$this->iteratorEmpty = true;
if ($length < 0) {
throw new InvalidArgumentException(sprintf('$length must be >= 0; given: %d', $length));
}
return false;
if ($this->chunksIterator === null) {
$this->initChunksIterator();
}
if ($this->buffer === null && ! $this->initBufferFromCurrentChunk()) {
return '';
}
if ($this->firstCheck) {
$this->chunksIterator->rewind();
$this->firstCheck = false;
} else {
$this->chunksIterator->next();
$data = '';
while (strlen($data) < $length) {
if ($this->bufferOffset >= strlen($this->buffer) && ! $this->initBufferFromNextChunk()) {
break;
}
$initialDataLength = strlen($data);
$data .= substr($this->buffer, $this->bufferOffset, $length - $initialDataLength);
$this->bufferOffset += strlen($data) - $initialDataLength;
}
return $data;
}
/**
* Initialize the buffer to the current chunk's data.
*
* @return boolean Whether there was a current chunk to read
* @throws CorruptFileException if an expected chunk could not be read successfully
*/
private function initBufferFromCurrentChunk()
{
if ($this->chunkOffset === 0 && $this->numChunks === 0) {
return false;
}
if ( ! $this->chunksIterator->valid()) {
throw CorruptFileException::missingChunk($this->chunkOffset);
}
if ($this->chunksIterator->current()->n != $this->chunkOffset) {
throw CorruptFileException::unexpectedIndex($this->chunksIterator->current()->n, $this->chunkOffset);
$currentChunk = $this->chunksIterator->current();
if ($currentChunk->n !== $this->chunkOffset) {
throw CorruptFileException::unexpectedIndex($currentChunk->n, $this->chunkOffset);
}
$actualChunkSize = strlen($this->chunksIterator->current()->data->getData());
$this->buffer = $currentChunk->data->getData();
$expectedChunkSize = ($this->chunkOffset == $this->numChunks - 1)
? ($this->length - $this->bytesSeen)
$actualChunkSize = strlen($this->buffer);
$expectedChunkSize = ($this->chunkOffset === $this->numChunks - 1)
? $this->expectedLastChunkSize
: $this->chunkSize;
if ($actualChunkSize != $expectedChunkSize) {
if ($actualChunkSize !== $expectedChunkSize) {
throw CorruptFileException::unexpectedSize($actualChunkSize, $expectedChunkSize);
}
$this->bytesSeen += $actualChunkSize;
$this->chunkOffset++;
return true;
}
private function initEmptyBuffer()
/**
* Advance to the next chunk and initialize the buffer to its data.
*
* @return boolean Whether there was a next chunk to read
* @throws CorruptFileException if an expected chunk could not be read successfully
*/
private function initBufferFromNextChunk()
{
if (isset($this->buffer)) {
fclose($this->buffer);
if ($this->chunkOffset === $this->numChunks - 1) {
return false;
}
$this->buffer = fopen("php://memory", "w+b");
$this->bufferEmpty = true;
$this->bufferFresh = true;
$this->bufferOffset = 0;
$this->chunkOffset++;
$this->chunksIterator->next();
return $this->initBufferFromCurrentChunk();
}
/**
* Initializes the chunk iterator starting from the current offset.
*/
private function initChunksIterator()
{
$cursor = $this->collectionWrapper->findChunksByFileId($this->file->_id, $this->chunkOffset);
$this->chunksIterator = new IteratorIterator($cursor);
$this->chunksIterator->rewind();
}
}
......@@ -119,17 +119,17 @@ class StreamWrapper
* if data is not available to be read.
*
* @see http://php.net/manual/en/streamwrapper.stream-read.php
* @param integer $count Number of bytes to read
* @param integer $length Number of bytes to read
* @return string
*/
public function stream_read($count)
public function stream_read($length)
{
if ( ! $this->stream instanceof ReadableStream) {
return '';
}
try {
return $this->stream->downloadNumBytes($count);
return $this->stream->readBytes($length);
} catch (Exception $e) {
trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING);
return false;
......
......@@ -74,12 +74,12 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
/**
* @dataProvider provideFileIdAndExpectedBytes
*/
public function testDownloadNumBytes($fileId, $numBytes, $expectedBytes)
public function testReadBytes($fileId, $length, $expectedBytes)
{
$fileDocument = $this->collectionWrapper->findFileById($fileId);
$stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$this->assertSame($expectedBytes, $stream->downloadNumBytes($numBytes));
$this->assertSame($expectedBytes, $stream->readBytes($length));
}
public function provideFileIdAndExpectedBytes()
......@@ -111,14 +111,14 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
/**
* @dataProvider provideFileIdAndExpectedBytes
*/
public function testDownloadNumBytesCalledMultipleTimes($fileId, $numBytes, $expectedBytes)
public function testReadBytesCalledMultipleTimes($fileId, $length, $expectedBytes)
{
$fileDocument = $this->collectionWrapper->findFileById($fileId);
$stream = new ReadableStream($this->collectionWrapper, $fileDocument);
for ($i = 0; $i < $numBytes; $i++) {
for ($i = 0; $i < $length; $i++) {
$expectedByte = isset($expectedBytes[$i]) ? $expectedBytes[$i] : '';
$this->assertSame($expectedByte, $stream->downloadNumBytes(1));
$this->assertSame($expectedByte, $stream->readBytes(1));
}
}
......@@ -126,35 +126,35 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
* @expectedException MongoDB\GridFS\Exception\CorruptFileException
* @expectedExceptionMessage Chunk not found for index "2"
*/
public function testDownloadNumBytesWithMissingChunk()
public function testReadBytesWithMissingChunk()
{
$this->chunksCollection->deleteOne(['files_id' => 'length-10', 'n' => 2]);
$fileDocument = $this->collectionWrapper->findFileById('length-10');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(10);
$stream->readBytes(10);
}
/**
* @expectedException MongoDB\GridFS\Exception\CorruptFileException
* @expectedExceptionMessage Expected chunk to have index "1" but found "2"
*/
public function testDownloadNumBytesWithUnexpectedChunkIndex()
public function testReadBytesWithUnexpectedChunkIndex()
{
$this->chunksCollection->deleteOne(['files_id' => 'length-10', 'n' => 1]);
$fileDocument = $this->collectionWrapper->findFileById('length-10');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(10);
$stream->readBytes(10);
}
/**
* @expectedException MongoDB\GridFS\Exception\CorruptFileException
* @expectedExceptionMessage Expected chunk to have size "2" but found "1"
*/
public function testDownloadNumBytesWithUnexpectedChunkSize()
public function testReadBytesWithUnexpectedChunkSize()
{
$this->chunksCollection->updateOne(
['files_id' => 'length-10', 'n' => 2],
......@@ -164,17 +164,17 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
$fileDocument = $this->collectionWrapper->findFileById('length-10');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(10);
$stream->readBytes(10);
}
/**
* @expectedException MongoDB\Exception\InvalidArgumentException
*/
public function testDownloadNumBytesWithNegativeReadSize()
public function testReadBytesWithNegativeLength()
{
$fileDocument = $this->collectionWrapper->findFileById('length-0');
$stream = new ReadableStream($this->collectionWrapper, $fileDocument);
$stream->downloadNumBytes(-1);
$stream->readBytes(-1);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment