Commit 234d565c authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #632

parents b42924e6 8f1f3a3a
...@@ -47,7 +47,7 @@ use MongoDB\Exception\UnsupportedException; ...@@ -47,7 +47,7 @@ use MongoDB\Exception\UnsupportedException;
*/ */
class Watch implements Executable, /* @internal */ CommandSubscriber class Watch implements Executable, /* @internal */ CommandSubscriber
{ {
private static $wireVersionForOperationTime = 7; private static $wireVersionForStartAtOperationTime = 7;
const FULL_DOCUMENT_DEFAULT = 'default'; const FULL_DOCUMENT_DEFAULT = 'default';
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'; const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
...@@ -267,9 +267,9 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -267,9 +267,9 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
private function executeAggregate(Server $server) private function executeAggregate(Server $server)
{ {
/* If we've already captured an operation time or the server does not /* If we've already captured an operation time or the server does not
* support returning an operation time (e.g. MongoDB 3.6), execute the * support resuming from an operation time (e.g. MongoDB 3.6), execute
* aggregation directly and return its cursor. */ * the aggregation directly and return its cursor. */
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForOperationTime)) { if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
return $this->aggregate->execute($server); return $this->aggregate->execute($server);
} }
......
...@@ -21,6 +21,8 @@ use ReflectionClass; ...@@ -21,6 +21,8 @@ use ReflectionClass;
class WatchFunctionalTest extends FunctionalTestCase class WatchFunctionalTest extends FunctionalTestCase
{ {
private static $wireVersionForStartAtOperationTime = 7;
private $defaultOptions = ['maxAwaitTimeMS' => 500]; private $defaultOptions = ['maxAwaitTimeMS' => 500];
public function setUp() public function setUp()
...@@ -134,6 +136,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -134,6 +136,8 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime() public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
{ {
$this->skipIfStartAtOperationTimeNotSupported();
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$operationTime = null; $operationTime = null;
...@@ -150,7 +154,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -150,7 +154,9 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertCount(1, $events); $this->assertCount(1, $events);
$this->assertSame('aggregate', $events[0]['started']->getCommandName()); $this->assertSame('aggregate', $events[0]['started']->getCommandName());
$operationTime = $events[0]['succeeded']->getReply()->operationTime; $reply = $events[0]['succeeded']->getReply();
$this->assertObjectHasAttribute('operationTime', $reply);
$operationTime = $reply->operationTime;
$this->assertInstanceOf(TimestampInterface::class, $operationTime); $this->assertInstanceOf(TimestampInterface::class, $operationTime);
$this->assertNull($changeStream->current()); $this->assertNull($changeStream->current());
...@@ -398,20 +404,29 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -398,20 +404,29 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1]); $this->insertDocument(['_id' => 1]);
/* Insert a document and advance the change stream to ensure we capture
* a resume token. This is necessary when startAtOperationTime is not
* supported (i.e. 3.6 server version). */
$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$this->insertDocument(['_id' => 2]);
/* Killing the cursor and advancing when there is a result will test /* Killing the cursor and advancing when there is a result will test
* that next()'s resume attempt picks up the latest change. */ * that next()'s resume attempt picks up the latest change. */
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key()); $this->assertSame(1, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
'operationType' => 'insert', 'operationType' => 'insert',
'fullDocument' => ['_id' => 1], 'fullDocument' => ['_id' => 2],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1], 'documentKey' => ['_id' => 2],
]; ];
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
...@@ -424,48 +439,48 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -424,48 +439,48 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->rewind(); $changeStream->rewind();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key()); $this->assertSame(1, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
'operationType' => 'insert', 'operationType' => 'insert',
'fullDocument' => ['_id' => 1], 'fullDocument' => ['_id' => 2],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1], 'documentKey' => ['_id' => 2],
]; ];
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->insertDocument(['_id' => 2]); $this->insertDocument(['_id' => 3]);
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key()); $this->assertSame(2, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
'operationType' => 'insert', 'operationType' => 'insert',
'fullDocument' => ['_id' => 2], 'fullDocument' => ['_id' => 3],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2], 'documentKey' => ['_id' => 3],
]; ];
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
$this->insertDocument(['_id' => 3]); $this->insertDocument(['_id' => 4]);
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(2, $changeStream->key()); $this->assertSame(3, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
'operationType' => 'insert', 'operationType' => 'insert',
'fullDocument' => ['_id' => 3], 'fullDocument' => ['_id' => 4],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 3], 'documentKey' => ['_id' => 4],
]; ];
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
...@@ -476,18 +491,18 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -476,18 +491,18 @@ class WatchFunctionalTest extends FunctionalTestCase
* we'll see {_id: 3} returned again. */ * we'll see {_id: 3} returned again. */
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
$this->insertDocument(['_id' => 4]); $this->insertDocument(['_id' => 5]);
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(3, $changeStream->key()); $this->assertSame(4, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
'operationType' => 'insert', 'operationType' => 'insert',
'fullDocument' => ['_id' => 4], 'fullDocument' => ['_id' => 5],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 4], 'documentKey' => ['_id' => 5],
]; ];
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
...@@ -944,4 +959,11 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -944,4 +959,11 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new DatabaseCommand($this->getDatabaseName(), $command); $operation = new DatabaseCommand($this->getDatabaseName(), $command);
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
} }
private function skipIfStartAtOperationTimeNotSupported()
{
if (!\MongoDB\server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime)) {
$this->markTestSkipped('Operation time is not supported');
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment