Commit 76f89eae authored by Will Banfield's avatar Will Banfield Committed by Jeremy Mikola

PHPLIB-147: Implement GridFS download

parent e2bf1d45
<?php
namespace MongoDB\Exception;
class GridFSCorruptFileException extends \MongoDB\Driver\Exception\RuntimeException implements Exception
{
}
<?php
namespace MongoDB\Exception;
class GridFSFileNotFoundException extends \MongoDB\Driver\Exception\RuntimeException implements Exception
{
public function __construct($fname, $bucketName, $databaseName){
parent::__construct(sprintf('Unable to find file by: %s in %s.%s', $fname,$databaseName, $bucketName));
}
}
......@@ -22,7 +22,6 @@ class Bucket
private $options;
private $filesCollection;
private $chunksCollection;
private $indexChecker;
private $ensuredIndexes = false;
/**
* Constructs a GridFS bucket.
......@@ -85,42 +84,6 @@ class Bucket
$collectionOptions
);
}
/**
* Opens a Stream for reading the contents of a file specified by ID.
*
* @param ObjectId $id
* @return Stream
*/
public function openDownloadStream(ObjectId $id)
{
fopen('gridfs://$this->databaseName/$id', 'r');
}
/**
* Downloads the contents of the stored file specified by id and writes
* the contents to the destination Stream.
* @param ObjectId $id GridFS File Id
* @param Stream $destination Destination Stream
*/
public function downloadToStream(ObjectId $id, $destination)
{
$result = $this->filesCollection->findOne(['_id' => $id]);
if ($result == null) {
return;
}
if ($result->length == 0){
return;
}
$n=0;
$results = $this->chunksCollection->find(['files_id' => $result->_id]);
foreach ($results as $chunk) {
if ($chunk->n != $n) {
return;
}
fwrite($destination, $chunk->data);
$n++;
}
}
/**
* Return the chunkSizeBytes option for this Bucket.
*
......@@ -130,6 +93,7 @@ class Bucket
{
return $this->options['chunkSizeBytes'];
}
public function getDatabaseName()
{
return $this->databaseName;
......@@ -138,10 +102,15 @@ class Bucket
{
return $this->filesCollection;
}
public function getChunksCollection()
{
return $this->chunksCollection;
}
public function getBucketName()
{
return $this->options['bucketName'];
}
public function find($filter, array $options =[])
{
//add proper validation for the filter and for the options
......@@ -168,7 +137,6 @@ class Bucket
$this->ensureChunksIndex();
$this->ensuredIndexes = true;
}
private function ensureChunksIndex()
{
foreach ($this->chunksCollection->listIndexes() as $index) {
......@@ -178,7 +146,6 @@ class Bucket
}
$this->chunksCollection->createIndex(['files_id' => 1, 'n' => 1], ['unique' => true]);
}
private function ensureFilesIndex()
{
foreach ($this->filesCollection->listIndexes() as $index) {
......@@ -188,7 +155,6 @@ class Bucket
}
$this->filesCollection->createIndex(['filename' => 1, 'uploadDate' => 1]);
}
private function isFilesCollectionEmpty()
{
return null === $this->filesCollection->findOne([], [
......@@ -196,11 +162,11 @@ class Bucket
'projection' => ['_id' => 1],
]);
}
public function delete(ObjectId $id)
{
$options = ['writeConcern' => $this->writeConcern];
$this->chunksCollection->deleteMany(['file_id' => $id], $options);
$this->filesCollection->deleteOne(['_id' => $id], $options);
}
}
......@@ -38,4 +38,30 @@ class BucketReadWriter
$gridFsStream = new GridFsUpload($this->bucket, $filename, $options);
return $gridFsStream->uploadFromStream($source);
}
/**
* Opens a Stream for reading the contents of a file specified by ID.
*
* @param ObjectId $id
* @return Stream
*/
public function openDownloadStream(\MongoDB\BSON\ObjectId $id)
{
$options = [
'bucket' => $this->bucket
];
$context = stream_context_create(['gridfs' => $options]);
return fopen(sprintf('gridfs://%s/%s', $this->bucket->getDatabaseName(), $id), 'r', false, $context);
}
/**
* Downloads the contents of the stored file specified by id and writes
* the contents to the destination Stream.
* @param ObjectId $id GridFS File Id
* @param Stream $destination Destination Stream
*/
public function downloadToStream(\MongoDB\BSON\ObjectId $id, $destination)
{
$gridFsStream = new GridFsDownload($this->bucket, $id);
$gridFsStream->downloadToStream($destination);
}
}
<?php
namespace MongoDB\GridFS;
use MongoDB\Collection;
use MongoDB\Exception\RuntimeException;
use MongoDB\BSON\ObjectId;
/**
* GridFsupload abstracts the processes of inserting into a GridFSBucket
*
* @api
*/
class GridFsDownload extends GridFsStream
{
private $chunksIterator;
private $bytesSeen=0;
private $numChunks;
private $iteratorEmpty=false;
private $firstCheck=true;
private $bufferFresh=true;
private $bufferEmpty=true;
/**
* Constructs a GridFS upload stream
*
* Supported options:
*
* * contentType (string): DEPRECATED content type to be stored with the file.
* This information should now be added to the metadata
*
* * aliases (array of strings): DEPRECATED An array of aliases.
* Applications wishing to store aliases should add an aliases field to the
* metadata document instead.
*
* * metadata (array or object): User data for the 'metadata' field of the files
* collection document.
*
* * writeConcern (MongoDB\Driver\WriteConcern): Write concern.
*
* @param array $options File options
* @throws FileNotFoundException
*/
public function __construct(
Bucket $bucket,
ObjectId $objectId
)
{
$this->file = $bucket->getFilesCollection()->findOne(['_id' => $objectId]);
if (is_null($this->file)) {
//MUST RAISE AN ERROR ! (WHICH ONE I DON'T)
throw new \MongoDB\Exception\GridFSFileNotFoundException($objectId, $bucket->getBucketName(), $bucket->getDatabaseName());
}
if ($this->file->length > 0) {
$cursor = $bucket->getChunksCollection()->find(['files_id' => $this->file->_id], ['sort' => ['n' => 1]]);
$this->chunksIterator = new \IteratorIterator($cursor);
$this->numChunks = ceil($this->file->length / $this->file->chunkSize);
}
parent::__construct($bucket);
}
/**
* Reads data from a stream into GridFS
*
* @param Stream $source Source Stream
* @return ObjectId
*/
public function downloadToStream($destination)
{
while($this->advanceChunks()) {
fwrite($destination, $this->chunksIterator->current()->data->getData());
}
}
public function downloadNumBytes($numToRead) {
$output = "";
if ($this->bufferFresh) {
rewind($this->buffer);
$this->bufferFresh=false;
}
$output = fread($this->buffer, $numToRead);
if (strlen($output) == $numToRead) {
return $output;
}
fclose($this->buffer);
$this->buffer = fopen("php://temp", "w+");
$this->bufferFresh=true;
$this->bufferEmpty=true;
$bytesLeft = $numToRead - strlen($output);
while(strlen($output) < $numToRead && $this->advanceChunks()) {
$bytesLeft = $numToRead - strlen($output);
$output .= substr($this->chunksIterator->current()->data, 0, $bytesLeft);
}
if ($bytesLeft < strlen($this->chunksIterator->current()->data)) {
fwrite($this->buffer, substr($this->chunksIterator->current()->data, $bytesLeft));
$this->bufferEmpty=false;
}
return $output;
}
private function advanceChunks()
{
if($this->n >= $this->numChunks) {
$this->iteratorEmpty=true;
return false;
}
if($this->firstCheck) {
$this->chunksIterator->rewind();
$this->firstCheck=false;
} else {
$this->chunksIterator->next();
}
if (!$this->chunksIterator->valid()) {
throw new \MongoDB\Exception\GridFSCorruptFileException();
}
if ($this->chunksIterator->current()->n != $this->n) {
throw new \MongoDB\Exception\GridFSCorruptFileException();
}
$chunkSizeIs = strlen($this->chunksIterator->current()->data->getData());
if ($this->n == $this->numChunks - 1) {
$chunkSizeShouldBe = $this->file->length - $this->bytesSeen;
if($chunkSizeShouldBe != $chunkSizeIs) {
throw new \MongoDB\Exception\GridFSCorruptFileException();
}
} else if ($this->n < $this->numChunks - 1) {
if($chunkSizeIs != $this->file->chunkSize) {
throw new \MongoDB\Exception\GridFSCorruptFileException();
}
}
$this->bytesSeen+= $chunkSizeIs;
$this->n++;
return true;
}
public function close()
{
fclose($this->buffer);
}
public function isEOF()
{
$eof = $this->iteratorEmpty && $this->bufferEmpty;
return $eof;
}
}
......@@ -33,60 +33,27 @@ class StreamWrapper
}
stream_wrapper_register('gridfs', get_called_class(), STREAM_IS_URL);
}
private function initProtocol($path)
{
$parsed_path = parse_url($path);
$this->databaseName = $parsed_path["host"];
$this->identifier = substr($parsed_path["path"], 1);
}
public function stream_write($data)
{
$this->gridFsStream->insertChunks($data);
return strlen($data);
}
public function stream_read($count) {
$out ="";
if ($this->dirtyCache) {
$out = fread($this->buffer, $count);
if (strlen($out) == $count) {
return $out;
} else {
fclose($out);
$this->dirtyCache = false;
}
$this->n++;
}
if ($this->file->length <= $this->n) {
return false;
}
while(strlen($out) < $count && $this ->n <$this->file->length) {
$bytes_left = $count - strlen($out);
$next = $this->chunksCollection->findOne(['files_id' => $this->file->_id, "n" => $this->n]);
$out .= substr($next->data, 0, $bytes_left);
$this->n++;
return $this->gridFsStream->downloadNumBytes($count);
}
if ($bytes_left < strlen($next->data)) {
$this->buffer = tmpfile();
fwrite($this->buffer, substr($next->data, $bytes_left));
$this->dirtyCache =true;
}
return $out;
}
public function stream_eof() {
return $this->n >= $this->file->length;
return $this->gridFsStream->isEOF();
}
public function stream_close() {
$this->gridFsStream->close();
}
public function stream_open($path, $mode, $options, &$openedPath)
{
$this->initProtocol($path);
......@@ -99,17 +66,15 @@ class StreamWrapper
default: return false;
}
}
public function openWriteStream() {
$context = stream_context_get_options($this->context);
$options =$context['gridfs']['uploadOptions'];
$this->gridFsStream = new GridFsUpload($this->bucket, $this->identifier, $options);
return true;
}
public function openReadStream() {
$objectId = new \MongoDB\BSON\ObjectId($this->identifier);
$this->file = $this->filesCollection->findOne(['_id' => $objectId]);
$this->gridFsStream = new GridFsDownload($this->bucket, $objectId);
return true;
}
}
......@@ -6,19 +6,29 @@ use \MongoDB\GridFS;
use \MongoDB\Collection;
use \MongoDB\BSON\ObjectId;
use \MongoDB\BSON\Binary;
use \MongoDB\Exception;
class SpecificationTests extends FunctionalTestCase
{
private $commands;
private $collections;
public function setUp()
{
parent::setUp();
$this->commands = array(
'insert' => function($col, $docs) {
$col->insertMany($docs['documents']);}
$col->insertMany($docs['documents']);},
'update' => function($col, $docs) {
foreach($docs['updates'] as $update) {
$col->updateMany($update['q'], $update['u']);
}
},
'delete' => function($col, $docs){
foreach($docs['deletes'] as $delete){
$col->deleteMany($delete['q']);
}
}
);
}
/**
......@@ -27,20 +37,38 @@ class SpecificationTests extends FunctionalTestCase
public function testSpecificationTests($testJson)
{
foreach ($testJson['tests'] as $test) {
$this->bucket = new \MongoDB\GridFS\Bucket($this->manager, $this->getDatabaseName(), $test['act']['arguments']['options']);
$this->initializeDatabases($testJson['data'], $test);
if(isset($test['act']['arguments']['options'])){
$options = $test['act']['arguments']['options'];
} else {
$options =[];
}
$this->bucket = new \MongoDB\GridFS\Bucket($this->manager, $this->getDatabaseName(), $this->fixTypes($options,false));
$this->bucketReadWriter = new \MongoDB\GridFS\BucketReadWriter($this->bucket);
$func = $test['act']['operation'] . "Command";
$error = null;
try {
$result = $this->$func($test['act']['arguments']);
} catch(Exception $e) {
} catch(\MongoDB\Exception\Exception $e) {
$error = $e;
}
$errors = ['FileNotFound' => '\MongoDB\Exception\GridFSFileNotFoundException',
'ChunkIsMissing' => '\MongoDB\Exception\GridFSCorruptFileException',
'ExtraChunk' => '\MongoDB\Exception\GridFSCorruptFileException',
'ChunkIsWrongSize' => '\MongoDB\Exception\GridFSCorruptFileException',
'RevisionNotFound' => '\MongoDB\Exception\GridFSFileNotFoundException'
];
if (!isset($test['assert']['error'])) {
//check that test didn't throw error
$this->assertNull($error);
} else {
//check that the error matches what we got
$shouldError = $test['assert']['error'];
$this->assertTrue($error instanceof $errors[$shouldError]);
}
$fixedAssert = $this->fixTypes($test['assert'], false);
if (isset($test['assert']['result'])) {
$testResult = $test['assert']['result'];
if ($testResult == "&result") {
......@@ -49,10 +77,10 @@ class SpecificationTests extends FunctionalTestCase
if ($testResult == "void") {
$test['assert']['result'] = null;
}
$this->assertEquals($result, $test['assert']['result']);
$this->assertEquals($result, $fixedAssert['result']);
}
if (isset($test['assert']['data'])) {
$this->runCommands($test['assert']['data'], $result);
$this->runCommands($fixedAssert, $result);
$this->collectionsEqual($this->collections['expected.files'],$this->bucket->getFilesCollection());
if(isset($this->collections['expected.chunks'])) {
$this->collectionsEqual($this->collections['expected.chunks'],$this->bucket->getChunksCollection());
......@@ -63,7 +91,7 @@ class SpecificationTests extends FunctionalTestCase
public function provideSpecificationTests()
{
$testPath=getcwd().'/tests/GridFS/Specification/tests/upload.json';
$testPath=getcwd().'/tests/GridFS/Specification/tests/download.json';
$testArgs = [];
foreach(glob($testPath) as $filename) {
......@@ -84,7 +112,7 @@ class SpecificationTests extends FunctionalTestCase
$result[$key] = new \MongoDB\BSON\Binary($result[$key], \MongoDB\BSON\Binary::TYPE_GENERIC);
}
} else if (is_array($value) && isset($value['$oid'])) {
$result[$key] = new ObjectId("".$value['$oid']);
$result[$key] = new \MongoDB\BSON\ObjectId("".$value['$oid']);
} else if (is_array($value)) {
$result[$key] = $this->fixTypes($result[$key], $makeBinary);
} else if(is_string($value) && $value == '*actual') {
......@@ -145,17 +173,60 @@ class SpecificationTests extends FunctionalTestCase
}
}
public function initializeDatabases($data, $test)
{
$collectionsToDrop = ['fs.files','fs.chunks','expected.files','expected.chunks'];
$data = $this->fixTypes($data, true);
foreach ($collectionsToDrop as $collectionName) {
$collection = new Collection($this->manager, sprintf("%s.%s", $this->getDatabaseName(), $collectionName));
$collection->drop();
}
if (isset($data['files']) && count($data['files']) > 0) {
$filesCollection = new Collection($this->manager, sprintf("%s.%s", $this->getDatabaseName(), "fs.files"));
$filesCollection->insertMany($data['files']);
$expectedFilesCollection = new Collection($this->manager, sprintf("%s.%s", $this->getDatabaseName(), "expected.files"));
$expectedFilesCollection->insertMany($data['files']);
}
if (isset($data['chunks']) && count($data['chunks']) > 0) {
$chunksCollection = new Collection($this->manager, sprintf("%s.%s", $this->getDatabaseName(), "fs.chunks"));
$chunksCollection->insertMany($data['chunks']);
$expectedChunksCollection = new Collection($this->manager, sprintf("%s.%s", $this->getDatabaseName(), "expected.chunks"));
$expectedChunksCollection->insertMany($data['chunks']);
}
if(isset($test['arrange'])) {
foreach($test['arrange']['data'] as $cmd) {
foreach($cmd as $key => $value) {
if(isset($this->commands[$key])) {
$collection = new Collection($this->manager, sprintf("%s.%s", $this->getDatabaseName(), $cmd[$key]));
$this->commands[$key]($collection,$this->fixTypes($cmd, true));
}
}
}
}
}
public function uploadCommand($args)
{
$args = $this->fixTypes($args, false);
$stream = fopen('php://temp', 'w+');
fwrite($stream, $args['source']);
rewind($stream);
return $this->bucketReadWriter->uploadFromStream($args['filename'], $stream, $args['options']);
$result = $this->bucketReadWriter->uploadFromStream($args['filename'], $stream, $args['options']);
fclose($stream);
return $result;
}
function downloadCommand($args)
{
$args = $this->fixTypes($args, false);
$streamWrapper = new \MongoDB\GridFS\StreamWrapper();
$streamWrapper->register($this->manager);
$stream = fopen('php://temp', 'w+');
$this->bucketReadWriter->downloadToStream($args['id'], $stream);
rewind($stream);
$result = stream_get_contents($stream);
fclose($stream);
return $result;
}
function deleteCommand($args)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment