Commit a09973f1 authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #476

parents 0e1516b0 956f1e15
source:
file: apiargs-MongoDBCollection-method-aggregate-option.yaml
ref: batchSize
---
arg_name: option
name: batchSize
type: integer
description: |
Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster.
interface: phpmethod
operation: ~
optional: true
---
source:
file: apiargs-MongoDBCollection-common-option.yaml
......@@ -18,6 +25,10 @@ description: |
- ``MongoDB\Operation\ChangeStream::FULL_DOCUMENT_DEFAULT`` (*default*)
- ``MongoDB\Operation\ChangeStream::FULL_DOCUMENT_UPDATE_LOOKUP``
.. note::
This is an option of the `$changeStream` pipeline stage.
interface: phpmethod
operation: ~
optional: true
......@@ -45,7 +56,9 @@ type: array|object
description: |
Specifies the logical starting point for the new change stream.
Note this is an option of the '$changeStream' pipeline stage.
.. note::
This is an option of the `$changeStream` pipeline stage.
interface: phpmethod
operation: ~
optional: true
......
......@@ -100,7 +100,7 @@ class ChangeStream implements Iterator
$resumable = true;
}
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
......@@ -147,7 +147,7 @@ class ChangeStream implements Iterator
if (isset($document->_id)) {
$this->resumeToken = is_array($document) ? $document['_id'] : $document->_id;
} else {
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing");
}
}
......
......@@ -19,7 +19,7 @@ namespace MongoDB;
use MongoDB\BSON\JavascriptInterface;
use MongoDB\BSON\Serializable;
use MongoDB\ChangeStream as ChangeStreamResult;
use MongoDB\ChangeStream;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadConcern;
......@@ -32,7 +32,6 @@ use MongoDB\Exception\UnsupportedException;
use MongoDB\Model\IndexInfoIterator;
use MongoDB\Operation\Aggregate;
use MongoDB\Operation\BulkWrite;
use MongoDB\Operation\ChangeStream;
use MongoDB\Operation\CreateIndexes;
use MongoDB\Operation\Count;
use MongoDB\Operation\DeleteMany;
......@@ -52,6 +51,7 @@ use MongoDB\Operation\MapReduce;
use MongoDB\Operation\ReplaceOne;
use MongoDB\Operation\UpdateMany;
use MongoDB\Operation\UpdateOne;
use MongoDB\Operation\Watch;
use Traversable;
class Collection
......@@ -939,14 +939,14 @@ class Collection
return $operation->execute($server);
}
/*
* ChangeStream outline
/**
* Create a change stream for watching changes to the collection.
*
* @see ChangeStream::__construct() for supported options
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @see Watch::__construct() for supported options
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @return ChangeStream
* @throws InvalidArgumentException for parameter/option parsing errors
* @return ChangeStreamResult
*/
public function watch(array $pipeline = [], array $options = [])
{
......@@ -956,11 +956,18 @@ class Collection
$server = $this->manager->selectServer($options['readPreference']);
if ( ! isset($options['readConcern'])) {
/* Although change streams require a newer version of the server than
* read concerns, perform the usual wire version check before inheriting
* the collection's read concern. In the event that the server is too
* old, this makes it more likely that users will encounter an error
* related to change streams being unsupported instead of an
* UnsupportedException regarding use of the "readConcern" option from
* the Aggregate operation class. */
if ( ! isset($options['readConcern']) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
}
$operation = new ChangeStream($this->databaseName, $this->collectionName, $pipeline, $options, $this->manager);
$operation = new Watch($this->manager, $this->databaseName, $this->collectionName, $pipeline, $options);
return $operation->execute($server);
}
......
......@@ -17,7 +17,7 @@
namespace MongoDB\Operation;
use MongoDB\ChangeStream as ChangeStreamResult;
use MongoDB\ChangeStream;
use MongoDB\Driver\Command;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadConcern;
......@@ -32,10 +32,10 @@ use MongoDB\Exception\UnsupportedException;
* Operation for creating a change stream with the aggregate command.
*
* @api
* @see \MongoDB\Collection::changeStream()
* @see http://docs.mongodb.org/manual/reference/command/changeStream/
* @see \MongoDB\Collection::watch()
* @see https://docs.mongodb.com/manual/changeStreams/
*/
class ChangeStream implements Executable
class Watch implements Executable
{
const FULL_DOCUMENT_DEFAULT = 'default';
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
......@@ -47,18 +47,19 @@ class ChangeStream implements Executable
private $manager;
/**
* Constructs a changeStream command.
* Constructs an aggregate command for creating a change stream.
*
* Supported options:
*
* * fullDocument (string): Allowed values: ‘default’, ‘updateLookup’.
* Defaults to ‘default’. When set to ‘updateLookup’, the change
* notification for partial updates will include both a delta describing
* the changes to the document, as well as a copy of the entire document
* that was changed from some time after the change occurred. For forward
* compatibility, a driver MUST NOT raise an error when a user provides
* an unknown value. The driver relies on the server to validate this
* option.
* * fullDocument (string): Determines whether the "fullDocument" field
* will be populated for update operations. By default, change streams
* only return the delta of fields during the update operation (via the
* "updateDescription" field). To additionally return the most current
* majority-committed version of the updated document, specify
* "updateLookup" for this option. Defaults to "default".
*
* Insert and replace operations always include the "fullDocument" field
* and delete operations omit the field as the document no longer exists.
*
* * resumeAfter (document): Specifies the logical starting point for the
* new change stream.
......@@ -69,7 +70,9 @@ class ChangeStream implements Executable
* This is not supported for server versions < 3.2 and will result in an
* exception at execution time if used.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
* * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
* will be used to select a new server when resuming. Defaults to a
* "primary" read preference.
*
* * maxAwaitTimeMS (integer): The maximum amount of time for the server to
* wait on new documents to satisfy a change stream query.
......@@ -91,8 +94,13 @@ class ChangeStream implements Executable
* @param Manager $manager Manager instance from the driver
* @throws InvalidArgumentException for parameter/option parsing errors
*/
public function __construct($databaseName, $collectionName, array $pipeline, array $options = [], Manager $manager)
public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
{
$options += [
'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
];
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
throw InvalidArgumentException::invalidType('"batchSize" option', $options['batchSize'], 'integer');
}
......@@ -119,11 +127,11 @@ class ChangeStream implements Executable
}
}
$this->manager = $manager;
$this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName;
$this->pipeline = $pipeline;
$this->options = $options;
$this->manager = $manager;
}
/**
......@@ -131,7 +139,7 @@ class ChangeStream implements Executable
*
* @see Executable::execute()
* @param Server $server
* @return ChangeStreamResult
* @return ChangeStream
* @throws UnexpectedValueException if the command response was malformed
* @throws UnsupportedException if collation, read concern, or write concern is used and unsupported
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
......@@ -142,7 +150,7 @@ class ChangeStream implements Executable
$cursor = $command->execute($server);
return new ChangeStreamResult($cursor, $this->createResumeCallable());
return new ChangeStream($cursor, $this->createResumeCallable());
}
private function createAggregateOptions()
......
......@@ -6,6 +6,7 @@ use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber;
use Exception;
/**
* Observes command documents using the driver's monitoring API.
......@@ -20,13 +21,19 @@ class CommandObserver implements CommandSubscriber
\MongoDB\Driver\Monitoring\addSubscriber($this);
call_user_func($execution);
try {
call_user_func($execution);
} catch (Exception $executionException) {}
\MongoDB\Driver\Monitoring\removeSubscriber($this);
foreach ($this->commands as $command) {
call_user_func($commandCallback, $command);
}
if (isset($executionException)) {
throw $executionException;
}
}
public function commandStarted(CommandStartedEvent $event)
......
......@@ -4,6 +4,7 @@ namespace MongoDB\Tests;
use MongoDB\Database;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Server;
use MongoDB\Operation\DropCollection;
use MongoDB\Operation\DropDatabase;
......@@ -923,66 +924,102 @@ class DocumentationExamplesTest extends FunctionalTestCase
public function testChangeStreamExample_1_4()
{
if ($this->getPrimaryServer()->getType() === Server::TYPE_STANDALONE) {
$this->markTestSkipped('$changeStream is not supported on standalone servers');
}
if (version_compare($this->getFeatureCompatibilityVersion(), '3.6', '<')) {
$this->markTestSkipped('$changeStream is only supported on FCV 3.6 or higher');
}
$db = new Database($this->manager, $this->getDatabaseName());
$db->dropCollection('inventory');
// Start Changestream Example 1
$cursor = $db->inventory->watch();
$cursor->next();
$current = $cursor->current();
$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();
// End Changestream Example 1
$this->assertNull($current);
$this->assertNull($firstChange);
$this->assertNull($secondChange);
// Start Changestream Example 2
$cursor = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\ChangeStream::FULL_DOCUMENT_UPDATE_LOOKUP]);
$cursor->next();
$current = $cursor->current();
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$nextChange = $changeStream->current();
// End Changestream Example 2
$this->assertNull($current);
$this->assertNull($firstChange);
$this->assertNull($nextChange);
$insertManyResult = $db->inventory->insertMany([
['_id' => 1, 'x' => 'foo'],
['_id' => 2, 'x' => 'bar'],
]);
$this->assertEquals(2, $insertManyResult->getInsertedCount());
$insertedResult = $db->inventory->insertOne(['x' => 1]);
$insertedId = $insertedResult->getInsertedId();
$cursor->next();
$current = $cursor->current();
$expectedChange = (object) [
'_id' => $current->_id,
$changeStream->next();
$this->assertTrue($changeStream->valid());
$lastChange = $changeStream->current();
$expectedChange = [
'_id' => $lastChange->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $insertedId, 'x' => 1],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'inventory'],
'documentKey' => (object) ['_id' => $insertedId]
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => 'inventory'],
'documentKey' => ['_id' => 1],
];
$this->assertEquals($current, $expectedChange);
$this->assertSameDocument($expectedChange, $lastChange);
// Start Changestream Example 3
$resumeToken = ($current !== null) ? $current->_id : null;
if ($resumeToken !== null) {
$cursor = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$cursor->next();
$resumeToken = ($lastChange !== null) ? $lastChange->_id : null;
if ($resumeToken === null) {
throw new \Exception('resumeToken was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$nextChange = $changeStream->current();
// End Changestream Example 3
$insertedResult = $db->inventory->insertOne(['x' => 2]);
$insertedId = $insertedResult->getInsertedId();
$cursor->next();
$expectedChange = (object) [
'_id' => $cursor->current()->_id,
$expectedChange = [
'_id' => $nextChange->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $insertedId, 'x' => 2],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'inventory'],
'documentKey' => (object) ['_id' => $insertedId]
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => 'inventory'],
'documentKey' => ['_id' => 2],
];
$this->assertEquals($cursor->current(), $expectedChange);
$this->assertSameDocument($expectedChange, $nextChange);
// Start Changestream Example 4
$pipeline = [['$match' => ['$or' => [['fullDocument.username' => 'alice'], ['operationType' => 'delete']]]]];
$cursor = $db->inventory->watch($pipeline, []);
$cursor->next();
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$nextChange = $changeStream->current();
// End Changestream Example 4
$this->assertNull($firstChange);
$this->assertNull($nextChange);
}
/**
......
<?php
namespace MongoDB\Tests\Operation;
use MongoDB\Client;
use MongoDB\Collection;
use MongoDB\Operation\DatabaseCommand;
class ChangeStreamFunctionalTest extends FunctionalTestCase
{
public function setUp()
{
parent::setUp();
if (version_compare($this->getFeatureCompatibilityVersion(), '3.6', '<')) {
$this->markTestSkipped('$changeStream is only supported on FCV 3.6 or higher');
}
}
public function testResume()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult = $this->collection->watch();
$changeStreamResult->rewind();
$this->assertNull($changeStreamResult->current());
$result = $this->collection->insertOne(['x' => 2]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$expectedResult = (object) ([
'_id' => $changeStreamResult->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 2],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'ChangeStreamFunctionalTest.e68b9f01'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
]);
$this->assertEquals($changeStreamResult->current(), $expectedResult);
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStreamResult->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$result = $this->collection->insertOne(['x' => 3]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$expectedResult = (object) ([
'_id' => $changeStreamResult->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 3],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'ChangeStreamFunctionalTest.e68b9f01'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
]);
$this->assertEquals($changeStreamResult->current(), $expectedResult);
}
public function testNoChangeAfterResumeBeforeInsert()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult = $this->collection->watch();
$changeStreamResult->rewind();
$this->assertNull($changeStreamResult->current());
$result = $this->collection->insertOne(['x' => 2]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$expectedResult = (object) ([
'_id' => $changeStreamResult->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 2],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'ChangeStreamFunctionalTest.4a554985'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
]);
$this->assertEquals($changeStreamResult->current(), $expectedResult);
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStreamResult->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$changeStreamResult->next();
$this->assertNull($changeStreamResult->current());
$result = $this->collection->insertOne(['x' => 3]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$expectedResult = (object) ([
'_id' => $changeStreamResult->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 3],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'ChangeStreamFunctionalTest.4a554985'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
]);
$this->assertEquals($changeStreamResult->current(), $expectedResult);
}
public function testResumeAfterKillThenNoOperations()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStreamResult = $this->collection->watch();
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStreamResult->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$changeStreamResult->next();
$this->assertNull($changeStreamResult->current());
}
public function testResumeAfterKillThenOperation()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStreamResult = $this->collection->watch();
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStreamResult->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$result = $this->collection->insertOne(['x' => 3]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$this->assertNull($changeStreamResult->current());
}
public function testKey()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStreamResult = $this->collection->watch();
$this->assertNull($changeStreamResult->key());
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$this->assertSame(1, $changeStreamResult->key());
$changeStreamResult->next();
$this->assertNull($changeStreamResult->key());
$changeStreamResult->next();
$this->assertNull($changeStreamResult->key());
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStreamResult->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$changeStreamResult->next();
$this->assertNull($changeStreamResult->key());
$result = $this->collection->insertOne(['x' => 2]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$this->assertSame(2, $changeStreamResult->key());
}
public function testNonEmptyPipeline()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$pipeline = [['$project' => ['foo' => [0]]]];
$changeStreamResult = $this->collection->watch($pipeline, []);
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$expectedResult = (object) ([
'_id' => $changeStreamResult->current()->_id,
'foo' => [0]
]);
$this->assertEquals($changeStreamResult->current(), $expectedResult);
}
public function testCursorWithEmptyBatchNotClosed()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStreamResult = $this->collection->watch();
$this->assertNotNull($changeStreamResult);
}
/**
* @expectedException MongoDB\Exception\ResumeTokenException
*/
public function testFailureAfterResumeTokenRemoved()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$pipeline = [['$project' => ['_id' => 0 ]]];
$changeStreamResult = $this->collection->watch($pipeline, []);
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
}
public function testConnectionException()
{
$client = new Client($this->getUri(), ['socketTimeoutMS' => 1005], []);
$collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName());
$changeStreamResult = $collection->watch();
$changeStreamResult->next();
$result = $collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$changeStreamResult->next();
$expectedResult = (object) ([
'_id' => $changeStreamResult->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 1],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'ChangeStreamFunctionalTest.226d95f1'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
]);
$this->assertEquals($changeStreamResult->current(), $expectedResult);
}
public function testMaxAwaitTimeMS()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
/* On average, an acknowledged write takes about 20 ms to appear in a
* change stream on the server so we'll use a higher maxAwaitTimeMS to
* ensure we see the write. */
$maxAwaitTimeMS = 100;
$changeStreamResult = $this->collection->watch([], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
/* The initial change stream is empty so we should expect a delay when
* we call rewind, since it issues a getMore. Expect to wait at least
* maxAwaitTimeMS, since no new documents should be inserted to wake up
* the server's query thread. Also ensure we don't wait too long (server
* default is one second). */
$startTime = microtime(true);
$changeStreamResult->rewind();
$duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($changeStreamResult->valid());
/* Advancing again on a change stream will issue a getMore, so we should
* expect a delay again. */
$startTime = microtime(true);
$changeStreamResult->next();
$duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($changeStreamResult->valid());
/* After inserting a document, the change stream will not issue a
* getMore so we should not expect a delay. */
$result = $this->collection->insertOne(['_id' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$startTime = microtime(true);
$changeStreamResult->next();
$duration = microtime(true) - $startTime;
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
$this->assertTrue($changeStreamResult->valid());
}
}
<?php
namespace MongoDB\Tests\Operation;
use MongoDB\ChangeStream;
use MongoDB\Client;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch;
use MongoDB\Tests\CommandObserver;
use stdClass;
use ReflectionClass;
class WatchFunctionalTest extends FunctionalTestCase
{
public function setUp()
{
parent::setUp();
if ($this->getPrimaryServer()->getType() === Server::TYPE_STANDALONE) {
$this->markTestSkipped('$changeStream is not supported on standalone servers');
}
if (version_compare($this->getFeatureCompatibilityVersion(), '3.6', '<')) {
$this->markTestSkipped('$changeStream is only supported on FCV 3.6 or higher');
}
}
public function testNextResumesAfterCursorNotFound()
{
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream);
$this->insertDocument(['_id' => 3, 'x' => 'baz']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 3, 'x' => 'baz'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 3]
];
$this->assertSameDocument($expectedResult, $changeStream->current());
}
/**
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
*/
public function testNextResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
* a socket timeout that is less than the change stream's maxAwaitTimeMS
* option. */
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($primaryServer);
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test resume functionality within next(). */
$commands = [];
try {
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function(stdClass $command) use (&$commands) {
$commands[] = key((array) $command);
}
);
$this->fail('ConnectionTimeoutException was not thrown');
} catch (ConnectionTimeoutException $e) {}
$expectedCommands = [
/* The initial aggregate command for change streams returns a cursor
* envelope with an empty initial batch, since there are no changes
* to report at the moment the change stream is created. Therefore,
* we expect a getMore to be issued when we first advance the change
* stream (with either rewind() or next()). */
'getMore',
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
* getMore command encounters a client socket timeout and leaves the
* cursor open on the server. ChangeStream should catch this error
* and resume by issuing a new aggregate command. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore',
];
$this->assertSame($expectedCommands, $commands);
}
public function testNoChangeAfterResumeBeforeInsert()
{
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream);
$changeStream->next();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 3, 'x' => 'baz']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 3, 'x' => 'baz'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 3],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
}
public function testKey()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$changeStream->next();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$changeStream->next();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->killChangeStreamCursor($changeStream);
$changeStream->next();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key());
}
public function testNonEmptyPipeline()
{
$pipeline = [['$project' => ['foo' => [0]]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['_id' => 1]);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'foo' => [0],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
}
public function testInitialCursorIsNotClosed()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
/* The spec requests that we assert that the cursor returned from the
* aggregate command is not closed on the driver side. We will verify
* this by checking that the cursor ID is non-zero and that libmongoc
* reports the cursor as alive. While the cursor ID is easily accessed
* through ChangeStream, we'll need to use reflection to access the
* internal Cursor and call isDead(). */
$this->assertNotEquals('0', (string) $changeStream->getCursorId());
$rc = new ReflectionClass('MongoDB\ChangeStream');
$rp = $rc->getProperty('csIt');
$rp->setAccessible(true);
$iterator = $rp->getValue($changeStream);
$this->assertInstanceOf('IteratorIterator', $iterator);
$cursor = $iterator->getInnerIterator();
$this->assertInstanceOf('MongoDB\Driver\Cursor', $cursor);
$this->assertFalse($cursor->isDead());
}
/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented
*/
public function testNextCannotExtractResumeToken()
{
$pipeline = [['$project' => ['_id' => 0 ]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->insertDocument(['x' => 1]);
$changeStream->next();
}
public function testMaxAwaitTimeMS()
{
/* On average, an acknowledged write takes about 20 ms to appear in a
* change stream on the server so we'll use a higher maxAwaitTimeMS to
* ensure we see the write. */
$maxAwaitTimeMS = 100;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
$changeStream = $operation->execute($this->getPrimaryServer());
/* The initial change stream is empty so we should expect a delay when
* we call rewind, since it issues a getMore. Expect to wait at least
* maxAwaitTimeMS, since no new documents should be inserted to wake up
* the server's query thread. Also ensure we don't wait too long (server
* default is one second). */
$startTime = microtime(true);
$changeStream->rewind();
$duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($changeStream->valid());
/* Advancing again on a change stream will issue a getMore, so we should
* expect a delay again. */
$startTime = microtime(true);
$changeStream->next();
$duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($changeStream->valid());
/* After inserting a document, the change stream will not issue a
* getMore so we should not expect a delay. */
$this->insertDocument(['_id' => 1]);
$startTime = microtime(true);
$changeStream->next();
$duration = microtime(true) - $startTime;
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
$this->assertTrue($changeStream->valid());
}
private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
$writeResult = $insertOne->execute($this->getPrimaryServer());
$this->assertEquals(1, $writeResult->getInsertedCount());
}
private function killChangeStreamCursor(ChangeStream $changeStream)
{
$command = [
'killCursors' => $this->getCollectionName(),
'cursors' => [ $changeStream->getCursorId() ],
];
$operation = new DatabaseCommand($this->getDatabaseName(), $command);
$operation->execute($this->getPrimaryServer());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment