Commit 6676e7ef authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-244: Use strings instead of memory stream for GridFS upload buffering

parent 9dfb2c5f
...@@ -150,7 +150,7 @@ class StreamWrapper ...@@ -150,7 +150,7 @@ class StreamWrapper
} }
try { try {
return $this->stream->insertChunks($data); return $this->stream->writeBytes($data);
} catch (Exception $e) { } catch (Exception $e) {
trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING); trigger_error(sprintf('%s: %s', get_class($e), $e->getMessage()), \E_USER_WARNING);
return false; return false;
......
...@@ -6,6 +6,7 @@ use MongoDB\BSON\Binary; ...@@ -6,6 +6,7 @@ use MongoDB\BSON\Binary;
use MongoDB\BSON\ObjectId; use MongoDB\BSON\ObjectId;
use MongoDB\BSON\UTCDateTime; use MongoDB\BSON\UTCDateTime;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\RuntimeException;
/** /**
* WritableStream abstracts the process of writing a GridFS file. * WritableStream abstracts the process of writing a GridFS file.
...@@ -16,8 +17,7 @@ class WritableStream ...@@ -16,8 +17,7 @@ class WritableStream
{ {
private static $defaultChunkSizeBytes = 261120; private static $defaultChunkSizeBytes = 261120;
private $buffer; private $buffer = '';
private $bufferLength = 0;
private $chunkOffset = 0; private $chunkOffset = 0;
private $chunkSize; private $chunkSize;
private $collectionWrapper; private $collectionWrapper;
...@@ -76,7 +76,6 @@ class WritableStream ...@@ -76,7 +76,6 @@ class WritableStream
$this->chunkSize = $options['chunkSizeBytes']; $this->chunkSize = $options['chunkSizeBytes'];
$this->collectionWrapper = $collectionWrapper; $this->collectionWrapper = $collectionWrapper;
$this->buffer = fopen('php://memory', 'w+b');
$this->ctx = hash_init('md5'); $this->ctx = hash_init('md5');
$this->file = [ $this->file = [
...@@ -113,14 +112,10 @@ class WritableStream ...@@ -113,14 +112,10 @@ class WritableStream
return; return;
} }
rewind($this->buffer); if (strlen($this->buffer) > 0) {
$cached = stream_get_contents($this->buffer); $this->insertChunkFromBuffer();
if (strlen($cached) > 0) {
$this->insertChunk($cached);
} }
fclose($this->buffer);
$this->fileCollectionInsert(); $this->fileCollectionInsert();
$this->isClosed = true; $this->isClosed = true;
} }
...@@ -151,36 +146,31 @@ class WritableStream ...@@ -151,36 +146,31 @@ class WritableStream
* Inserts binary data into GridFS via chunks. * Inserts binary data into GridFS via chunks.
* *
* Data will be buffered internally until chunkSizeBytes are accumulated, at * Data will be buffered internally until chunkSizeBytes are accumulated, at
* which point a chunk's worth of data will be inserted and the buffer * which point a chunk document will be inserted and the buffer reset.
* reset.
* *
* @param string $toWrite Binary data to write * @param string $data Binary data to write
* @return integer * @return integer
*/ */
public function insertChunks($toWrite) public function writeBytes($data)
{ {
if ($this->isClosed) { if ($this->isClosed) {
// TODO: Should this be an error condition? e.g. BadMethodCallException // TODO: Should this be an error condition? e.g. BadMethodCallException
return; return;
} }
$readBytes = 0; $bytesRead = 0;
while ($readBytes != strlen($toWrite)) { while ($bytesRead != strlen($data)) {
$addToBuffer = substr($toWrite, $readBytes, $this->chunkSize - $this->bufferLength); $initialBufferLength = strlen($this->buffer);
fwrite($this->buffer, $addToBuffer); $this->buffer .= substr($data, $bytesRead, $this->chunkSize - $initialBufferLength);
$readBytes += strlen($addToBuffer); $bytesRead += strlen($this->buffer) - $initialBufferLength;
$this->bufferLength += strlen($addToBuffer);
if ($this->bufferLength == $this->chunkSize) { if (strlen($this->buffer) == $this->chunkSize) {
rewind($this->buffer); $this->insertChunkFromBuffer();
$this->insertChunk(stream_get_contents($this->buffer));
ftruncate($this->buffer, 0);
$this->bufferLength = 0;
} }
} }
return $readBytes; return $bytesRead;
} }
private function abort() private function abort()
...@@ -206,14 +196,21 @@ class WritableStream ...@@ -206,14 +196,21 @@ class WritableStream
return $this->file['_id']; return $this->file['_id'];
} }
private function insertChunk($data) private function insertChunkFromBuffer()
{ {
if ($this->isClosed) { if ($this->isClosed) {
// TODO: Should this be an error condition? e.g. BadMethodCallException // TODO: Should this be an error condition? e.g. BadMethodCallException
return; return;
} }
$toUpload = [ if (strlen($this->buffer) == 0) {
return;
}
$data = $this->buffer;
$this->buffer = '';
$chunk = [
'files_id' => $this->file['_id'], 'files_id' => $this->file['_id'],
'n' => $this->chunkOffset, 'n' => $this->chunkOffset,
'data' => new Binary($data, Binary::TYPE_GENERIC), 'data' => new Binary($data, Binary::TYPE_GENERIC),
...@@ -221,7 +218,7 @@ class WritableStream ...@@ -221,7 +218,7 @@ class WritableStream
hash_update($this->ctx, $data); hash_update($this->ctx, $data);
$this->collectionWrapper->insertChunk($toUpload); $this->collectionWrapper->insertChunk($chunk);
$this->length += strlen($data); $this->length += strlen($data);
$this->chunkOffset++; $this->chunkOffset++;
} }
......
...@@ -55,10 +55,10 @@ class WritableStreamFunctionalTest extends FunctionalTestCase ...@@ -55,10 +55,10 @@ class WritableStreamFunctionalTest extends FunctionalTestCase
/** /**
* @dataProvider provideInputDataAndExpectedMD5 * @dataProvider provideInputDataAndExpectedMD5
*/ */
public function testInsertChunksCalculatesMD5($input, $expectedMD5) public function testWriteBytesCalculatesMD5($input, $expectedMD5)
{ {
$stream = new WritableStream($this->collectionWrapper, 'filename'); $stream = new WritableStream($this->collectionWrapper, 'filename');
$stream->insertChunks($input); $stream->writeBytes($input);
$stream->close(); $stream->close();
$fileDocument = $this->filesCollection->findOne( $fileDocument = $this->filesCollection->findOne(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment