Commit 041c0c4f authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #480

parents 488010b3 99f54dd5
...@@ -70,4 +70,8 @@ optional: true ...@@ -70,4 +70,8 @@ optional: true
source: source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
ref: session ref: session
---
source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: typeMap
... ...
...@@ -21,6 +21,7 @@ use MongoDB\BSON\Serializable; ...@@ -21,6 +21,7 @@ use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\ConnectionTimeoutException; use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Driver\Exception\RuntimeException; use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException; use MongoDB\Exception\ResumeTokenException;
use IteratorIterator; use IteratorIterator;
use Iterator; use Iterator;
...@@ -92,7 +93,7 @@ class ChangeStream implements Iterator ...@@ -92,7 +93,7 @@ class ChangeStream implements Iterator
try { try {
$this->csIt->next(); $this->csIt->next();
if ($this->valid()) { if ($this->valid()) {
$this->extractResumeToken($this->csIt->current()); $this->resumeToken = $this->extractResumeToken($this->csIt->current());
$this->key++; $this->key++;
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
...@@ -121,7 +122,7 @@ class ChangeStream implements Iterator ...@@ -121,7 +122,7 @@ class ChangeStream implements Iterator
try { try {
$this->csIt->rewind(); $this->csIt->rewind();
if ($this->valid()) { if ($this->valid()) {
$this->extractResumeToken($this->csIt->current()); $this->resumeToken = $this->extractResumeToken($this->csIt->current());
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) { if (strpos($e->getMessage(), "not master") !== false) {
...@@ -149,25 +150,36 @@ class ChangeStream implements Iterator ...@@ -149,25 +150,36 @@ class ChangeStream implements Iterator
} }
/** /**
* Extracts the resumeToken (_id) of the input document. * Extracts the resume token (i.e. "_id" field) from the change document.
* *
* @return void * @param array|document $document Change document
* @throws ResumeTokenException if the document has no _id. * @return mixed
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
*/ */
private function extractResumeToken($document) private function extractResumeToken($document)
{ {
if ($document === null) { if ( ! is_array($document) && ! is_object($document)) {
throw new ResumeTokenException("Cannot extract a resumeToken from an empty document"); throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
} }
if ($document instanceof Serializable) { if ($document instanceof Serializable) {
$this->extractResumeToken($document->bsonSerialize()); return $this->extractResumeToken($document->bsonSerialize());
return; }
$resumeToken = is_array($document)
? (isset($document['_id']) ? $document['_id'] : null)
: (isset($document->_id) ? $document->_id : null);
if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
} }
if (isset($document->_id)) {
$this->resumeToken = is_array($document) ? $document['_id'] : $document->_id; if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
} else { throw ResumeTokenException::invalidType($resumeToken);
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
} }
return $resumeToken;
} }
/** /**
......
...@@ -19,4 +19,24 @@ namespace MongoDB\Exception; ...@@ -19,4 +19,24 @@ namespace MongoDB\Exception;
class ResumeTokenException extends \Exception class ResumeTokenException extends \Exception
{ {
/**
* Thrown when a resume token is not found in a change document.
*
* @return self
*/
public static function notFound()
{
return new static('Resume token not found in change document');
}
/**
* Thrown when a resume token has an invalid type.
*
* @param mixed $value Actual value (used to derive the type)
* @return self
*/
public static function invalidType($value)
{
return new static(sprintf('Expected resume token to have type "array or object" but found "%s"', gettype($value)));
}
} }
...@@ -83,6 +83,9 @@ class Watch implements Executable ...@@ -83,6 +83,9 @@ class Watch implements Executable
* *
* Sessions are not supported for server versions < 3.6. * Sessions are not supported for server versions < 3.6.
* *
* * typeMap (array): Type map for BSON deserialization. This will be
* applied to the returned Cursor (it is not sent to the server).
*
* @param string $databaseName Database name * @param string $databaseName Database name
* @param string $collectionName Collection name * @param string $collectionName Collection name
* @param array $pipeline List of pipeline operations * @param array $pipeline List of pipeline operations
...@@ -148,7 +151,7 @@ class Watch implements Executable ...@@ -148,7 +151,7 @@ class Watch implements Executable
$pipeline = $this->pipeline; $pipeline = $this->pipeline;
array_unshift($pipeline, $changeStream); array_unshift($pipeline, $changeStream);
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1]); $aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions); return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions);
} }
......
...@@ -307,8 +307,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -307,8 +307,9 @@ class WatchFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\ResumeTokenException * @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Resume token not found in change document
*/ */
public function testNextCannotExtractResumeToken() public function testNextResumeTokenNotFound()
{ {
$pipeline = [['$project' => ['_id' => 0 ]]]; $pipeline = [['$project' => ['_id' => 0 ]]];
...@@ -324,8 +325,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -324,8 +325,9 @@ class WatchFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\ResumeTokenException * @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Resume token not found in change document
*/ */
public function testRewindCannotExtractResumeToken() public function testRewindResumeTokenNotFound()
{ {
$pipeline = [['$project' => ['_id' => 0 ]]]; $pipeline = [['$project' => ['_id' => 0 ]]];
...@@ -337,6 +339,40 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -337,6 +339,40 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->rewind(); $changeStream->rewind();
} }
/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
*/
public function testNextResumeTokenInvalidType()
{
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]);
$changeStream->next();
}
/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
*/
public function testRewindResumeTokenInvalidType()
{
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$changeStream->rewind();
}
public function testMaxAwaitTimeMS() public function testMaxAwaitTimeMS()
{ {
/* On average, an acknowledged write takes about 20 ms to appear in a /* On average, an acknowledged write takes about 20 ms to appear in a
...@@ -427,6 +463,67 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -427,6 +463,67 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertSameDocument($expectedResult, $changeStream->current());
} }
/**
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
*/
public function testTypeMapOption(array $typeMap, $expectedChangeDocument)
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100, 'typeMap' => $typeMap]);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$changeDocument = $changeStream->current();
// Unset the resume token and namespace, which are intentionally omitted
if (is_array($changeDocument)) {
unset($changeDocument['_id'], $changeDocument['ns']);
} else {
unset($changeDocument->_id, $changeDocument->ns);
}
$this->assertEquals($expectedChangeDocument, $changeDocument);
}
public function provideTypeMapOptionsAndExpectedChangeDocument()
{
/* Note: the "_id" and "ns" fields are purposefully omitted because the
* resume token's value cannot be anticipated and the collection name,
* which is generated from the test name, is not available in the data
* provider, respectively. */
return [
[
['root' => 'array', 'document' => 'array'],
[
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'documentKey' => ['_id' => 1],
],
],
[
['root' => 'object', 'document' => 'array'],
(object) [
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'documentKey' => ['_id' => 1],
],
],
[
['root' => 'array', 'document' => 'stdClass'],
[
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
'documentKey' => (object) ['_id' => 1],
],
],
];
}
private function insertDocument($document) private function insertDocument($document)
{ {
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document); $insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
......
...@@ -67,6 +67,10 @@ class WatchTest extends FunctionalTestCase ...@@ -67,6 +67,10 @@ class WatchTest extends FunctionalTestCase
$options[][] = ['session' => $value]; $options[][] = ['session' => $value];
} }
foreach ($this->getInvalidArrayValues() as $value) {
$options[][] = ['typeMap' => $value];
}
return $options; return $options;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment