Commit 6e636cb8 authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #304

parents 9dfb2c5f 049a77c4
...@@ -80,6 +80,10 @@ class Bucket ...@@ -80,6 +80,10 @@ class Bucket
throw InvalidArgumentException::invalidType('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer'); throw InvalidArgumentException::invalidType('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer');
} }
if (isset($options['chunkSizeBytes']) && $options['chunkSizeBytes'] < 1) {
throw new InvalidArgumentException(sprintf('Expected "chunkSizeBytes" option to be >= 1, %d given', $options['chunkSizeBytes']));
}
if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) { if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) {
throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], 'MongoDB\Driver\ReadConcern'); throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], 'MongoDB\Driver\ReadConcern');
} }
......
...@@ -150,7 +150,7 @@ class StreamWrapper ...@@ -150,7 +150,7 @@ class StreamWrapper
} }
try { try {
return $this->stream->insertChunks($data); return $this->stream->writeBytes($data);
} catch (Exception $e) { } catch (Exception $e) {
trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING); trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING);
return false; return false;
......
...@@ -5,7 +5,9 @@ namespace MongoDB\GridFS; ...@@ -5,7 +5,9 @@ namespace MongoDB\GridFS;
use MongoDB\BSON\Binary; use MongoDB\BSON\Binary;
use MongoDB\BSON\ObjectId; use MongoDB\BSON\ObjectId;
use MongoDB\BSON\UTCDateTime; use MongoDB\BSON\UTCDateTime;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\RuntimeException;
/** /**
* WritableStream abstracts the process of writing a GridFS file. * WritableStream abstracts the process of writing a GridFS file.
...@@ -16,8 +18,7 @@ class WritableStream ...@@ -16,8 +18,7 @@ class WritableStream
{ {
private static $defaultChunkSizeBytes = 261120; private static $defaultChunkSizeBytes = 261120;
private $buffer; private $buffer = '';
private $bufferLength = 0;
private $chunkOffset = 0; private $chunkOffset = 0;
private $chunkSize; private $chunkSize;
private $collectionWrapper; private $collectionWrapper;
...@@ -66,6 +67,10 @@ class WritableStream ...@@ -66,6 +67,10 @@ class WritableStream
throw InvalidArgumentException::invalidType('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer'); throw InvalidArgumentException::invalidType('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer');
} }
if (isset($options['chunkSizeBytes']) && $options['chunkSizeBytes'] < 1) {
throw new InvalidArgumentException(sprintf('Expected "chunkSizeBytes" option to be >= 1, %d given', $options['chunkSizeBytes']));
}
if (isset($options['contentType']) && ! is_string($options['contentType'])) { if (isset($options['contentType']) && ! is_string($options['contentType'])) {
throw InvalidArgumentException::invalidType('"contentType" option', $options['contentType'], 'string'); throw InvalidArgumentException::invalidType('"contentType" option', $options['contentType'], 'string');
} }
...@@ -76,15 +81,13 @@ class WritableStream ...@@ -76,15 +81,13 @@ class WritableStream
$this->chunkSize = $options['chunkSizeBytes']; $this->chunkSize = $options['chunkSizeBytes'];
$this->collectionWrapper = $collectionWrapper; $this->collectionWrapper = $collectionWrapper;
$this->buffer = fopen('php://memory', 'w+b');
$this->ctx = hash_init('md5'); $this->ctx = hash_init('md5');
$this->file = [ $this->file = [
'_id' => $options['_id'], '_id' => $options['_id'],
'chunkSize' => $this->chunkSize, 'chunkSize' => $this->chunkSize,
'filename' => (string) $filename, 'filename' => (string) $filename,
// TODO: This is necessary until PHPC-536 is implemented 'uploadDate' => new UTCDateTime,
'uploadDate' => new UTCDateTime((int) floor(microtime(true) * 1000)),
] + array_intersect_key($options, ['aliases' => 1, 'contentType' => 1, 'metadata' => 1]); ] + array_intersect_key($options, ['aliases' => 1, 'contentType' => 1, 'metadata' => 1]);
} }
...@@ -113,14 +116,10 @@ class WritableStream ...@@ -113,14 +116,10 @@ class WritableStream
return; return;
} }
rewind($this->buffer); if (strlen($this->buffer) > 0) {
$cached = stream_get_contents($this->buffer); $this->insertChunkFromBuffer();
if (strlen($cached) > 0) {
$this->insertChunk($cached);
} }
fclose($this->buffer);
$this->fileCollectionInsert(); $this->fileCollectionInsert();
$this->isClosed = true; $this->isClosed = true;
} }
...@@ -151,69 +150,72 @@ class WritableStream ...@@ -151,69 +150,72 @@ class WritableStream
* Inserts binary data into GridFS via chunks. * Inserts binary data into GridFS via chunks.
* *
* Data will be buffered internally until chunkSizeBytes are accumulated, at * Data will be buffered internally until chunkSizeBytes are accumulated, at
* which point a chunk's worth of data will be inserted and the buffer * which point a chunk document will be inserted and the buffer reset.
* reset.
* *
* @param string $toWrite Binary data to write * @param string $data Binary data to write
* @return integer * @return integer
*/ */
public function insertChunks($toWrite) public function writeBytes($data)
{ {
if ($this->isClosed) { if ($this->isClosed) {
// TODO: Should this be an error condition? e.g. BadMethodCallException // TODO: Should this be an error condition? e.g. BadMethodCallException
return; return;
} }
$readBytes = 0; $bytesRead = 0;
while ($readBytes != strlen($toWrite)) { while ($bytesRead != strlen($data)) {
$addToBuffer = substr($toWrite, $readBytes, $this->chunkSize - $this->bufferLength); $initialBufferLength = strlen($this->buffer);
fwrite($this->buffer, $addToBuffer); $this->buffer .= substr($data, $bytesRead, $this->chunkSize - $initialBufferLength);
$readBytes += strlen($addToBuffer); $bytesRead += strlen($this->buffer) - $initialBufferLength;
$this->bufferLength += strlen($addToBuffer);
if ($this->bufferLength == $this->chunkSize) { if (strlen($this->buffer) == $this->chunkSize) {
rewind($this->buffer); $this->insertChunkFromBuffer();
$this->insertChunk(stream_get_contents($this->buffer));
ftruncate($this->buffer, 0);
$this->bufferLength = 0;
} }
} }
return $readBytes; return $bytesRead;
} }
private function abort() private function abort()
{ {
$this->collectionWrapper->deleteChunksByFilesId($this->file['_id']); try {
$this->collectionWrapper->deleteChunksByFilesId($this->file['_id']);
} catch (DriverRuntimeException $e) {
// We are already handling an error if abort() is called, so suppress this
}
$this->isClosed = true; $this->isClosed = true;
} }
private function fileCollectionInsert() private function fileCollectionInsert()
{ {
if ($this->isClosed) {
// TODO: Should this be an error condition? e.g. BadMethodCallException
return;
}
$md5 = hash_final($this->ctx); $md5 = hash_final($this->ctx);
$this->file['length'] = $this->length; $this->file['length'] = $this->length;
$this->file['md5'] = $md5; $this->file['md5'] = $md5;
$this->collectionWrapper->insertFile($this->file); try {
$this->collectionWrapper->insertFile($this->file);
} catch (DriverRuntimeException $e) {
$this->abort();
throw $e;
}
return $this->file['_id']; return $this->file['_id'];
} }
private function insertChunk($data) private function insertChunkFromBuffer()
{ {
if ($this->isClosed) { if (strlen($this->buffer) == 0) {
// TODO: Should this be an error condition? e.g. BadMethodCallException
return; return;
} }
$toUpload = [ $data = $this->buffer;
$this->buffer = '';
$chunk = [
'files_id' => $this->file['_id'], 'files_id' => $this->file['_id'],
'n' => $this->chunkOffset, 'n' => $this->chunkOffset,
'data' => new Binary($data, Binary::TYPE_GENERIC), 'data' => new Binary($data, Binary::TYPE_GENERIC),
...@@ -221,7 +223,14 @@ class WritableStream ...@@ -221,7 +223,14 @@ class WritableStream
hash_update($this->ctx, $data); hash_update($this->ctx, $data);
$this->collectionWrapper->insertChunk($toUpload); try {
$this->collectionWrapper->insertChunk($chunk);
} catch (DriverRuntimeException $e) {
$this->abort();
throw $e;
}
$this->length += strlen($data); $this->length += strlen($data);
$this->chunkOffset++; $this->chunkOffset++;
} }
......
...@@ -68,6 +68,15 @@ class BucketFunctionalTest extends FunctionalTestCase ...@@ -68,6 +68,15 @@ class BucketFunctionalTest extends FunctionalTestCase
return $options; return $options;
} }
/**
* @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessage Expected "chunkSizeBytes" option to be >= 1, 0 given
*/
public function testConstructorShouldRequireChunkSizeBytesOptionToBePositive()
{
new Bucket($this->manager, $this->getDatabaseName(), ['chunkSizeBytes' => 0]);
}
/** /**
* @dataProvider provideInputDataAndExpectedChunks * @dataProvider provideInputDataAndExpectedChunks
*/ */
......
...@@ -150,9 +150,7 @@ class SpecFunctionalTest extends FunctionalTestCase ...@@ -150,9 +150,7 @@ class SpecFunctionalTest extends FunctionalTestCase
} }
if (isset($value['$date'])) { if (isset($value['$date'])) {
// TODO: This is necessary until PHPC-536 is implemented $value = new UTCDateTime(new DateTime($value['$date']));
$milliseconds = floor((new DateTime($value['$date']))->format('U.u') * 1000);
$value = new UTCDateTime((int) $milliseconds);
return; return;
} }
......
...@@ -52,13 +52,22 @@ class WritableStreamFunctionalTest extends FunctionalTestCase ...@@ -52,13 +52,22 @@ class WritableStreamFunctionalTest extends FunctionalTestCase
return $options; return $options;
} }
/**
* @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessage Expected "chunkSizeBytes" option to be >= 1, 0 given
*/
public function testConstructorShouldRequireChunkSizeBytesOptionToBePositive()
{
new WritableStream($this->collectionWrapper, 'filename', ['chunkSizeBytes' => 0]);
}
/** /**
* @dataProvider provideInputDataAndExpectedMD5 * @dataProvider provideInputDataAndExpectedMD5
*/ */
public function testInsertChunksCalculatesMD5($input, $expectedMD5) public function testWriteBytesCalculatesMD5($input, $expectedMD5)
{ {
$stream = new WritableStream($this->collectionWrapper, 'filename'); $stream = new WritableStream($this->collectionWrapper, 'filename');
$stream->insertChunks($input); $stream->writeBytes($input);
$stream->close(); $stream->close();
$fileDocument = $this->filesCollection->findOne( $fileDocument = $this->filesCollection->findOne(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment