Commit 9f41780d authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-323: Support typeMap option for change streams

parent 488010b3
......@@ -70,4 +70,8 @@ optional: true
source:
file: apiargs-common-option.yaml
ref: session
---
source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: typeMap
...
......@@ -21,6 +21,7 @@ use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use IteratorIterator;
use Iterator;
......@@ -149,25 +150,33 @@ class ChangeStream implements Iterator
}
/**
* Extracts the resumeToken (_id) of the input document.
* Extracts the resume token (i.e. "_id" field) from the change document.
*
* @return void
* @throws ResumeTokenException if the document has no _id.
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token cannot be found (i.e. no _id field)
*/
private function extractResumeToken($document)
{
if ($document === null) {
throw new ResumeTokenException("Cannot extract a resumeToken from an empty document");
if ( ! is_array($document) && ! is_object($document)) {
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
}
if (is_array($document) && isset($document['_id'])) {
$this->resumeToken = $document['_id'];
return;
}
if ($document instanceof Serializable) {
$this->extractResumeToken($document->bsonSerialize());
return;
}
if (isset($document->_id)) {
$this->resumeToken = is_array($document) ? $document['_id'] : $document->_id;
} else {
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
$this->resumeToken = $document->_id;
return;
}
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
}
/**
......
......@@ -83,6 +83,9 @@ class Watch implements Executable
*
* Sessions are not supported for server versions < 3.6.
*
* * typeMap (array): Type map for BSON deserialization. This will be
* applied to the returned Cursor (it is not sent to the server).
*
* @param string $databaseName Database name
* @param string $collectionName Collection name
* @param array $pipeline List of pipeline operations
......@@ -148,7 +151,7 @@ class Watch implements Executable
$pipeline = $this->pipeline;
array_unshift($pipeline, $changeStream);
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1]);
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions);
}
......
......@@ -427,6 +427,67 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSameDocument($expectedResult, $changeStream->current());
}
/**
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
*/
public function testTypeMapOption(array $typeMap, $expectedChangeDocument)
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100, 'typeMap' => $typeMap]);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$changeDocument = $changeStream->current();
// Unset the resume token and namespace, which are intentionally omitted
if (is_array($changeDocument)) {
unset($changeDocument['_id'], $changeDocument['ns']);
} else {
unset($changeDocument->_id, $changeDocument->ns);
}
$this->assertEquals($expectedChangeDocument, $changeDocument);
}
public function provideTypeMapOptionsAndExpectedChangeDocument()
{
/* Note: the "_id" and "ns" fields are purposefully omitted because the
* resume token's value cannot be anticipated and the collection name,
* which is generated from the test name, is not available in the data
* provider, respectively. */
return [
[
['root' => 'array', 'document' => 'array'],
[
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'documentKey' => ['_id' => 1],
],
],
[
['root' => 'object', 'document' => 'array'],
(object) [
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'documentKey' => ['_id' => 1],
],
],
[
['root' => 'array', 'document' => 'stdClass'],
[
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
'documentKey' => (object) ['_id' => 1],
],
],
];
}
private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
......
......@@ -67,6 +67,10 @@ class WatchTest extends FunctionalTestCase
$options[][] = ['session' => $value];
}
foreach ($this->getInvalidArrayValues() as $value) {
$options[][] = ['typeMap' => $value];
}
return $options;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment