Commit b3da293b authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-423: Add additional change stream prose tests

parent 975ff03e
...@@ -4,6 +4,7 @@ namespace MongoDB\Tests\Operation; ...@@ -4,6 +4,7 @@ namespace MongoDB\Tests\Operation;
use MongoDB\ChangeStream; use MongoDB\ChangeStream;
use MongoDB\BSON\TimestampInterface; use MongoDB\BSON\TimestampInterface;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager; use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference; use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
...@@ -33,6 +34,98 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -33,6 +34,98 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->createCollection(); $this->createCollection();
} }
/**
* Prose test: "ChangeStream must continuously track the last seen
* resumeToken"
*/
public function testGetResumeToken()
{
if ($this->isPostBatchResumeTokenSupported()) {
$this->markTestSkipped('postBatchResumeToken is supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->getResumeToken());
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$changeStream->next();
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
$changeStream->next();
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
$this->insertDocument(['x' => 3]);
$changeStream->next();
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
}
/**
* Prose test: "ChangeStream must continuously track the last seen
* resumeToken"
*/
public function testGetResumeTokenWithPostBatchResumeToken()
{
if ( ! $this->isPostBatchResumeTokenSupported()) {
$this->markTestSkipped('postBatchResumeToken is not supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$events = [];
(new CommandObserver)->observe(
function() use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(1, $events);
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$events = [];
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(1, $events);
$this->assertSame('getMore', $events[0]['started']->getCommandName());
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
$changeStream->next();
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
$changeStream->next();
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
}
/**
* Prose test: "ChangeStream will resume after a killCursors command is
* issued for its child cursor."
*/
public function testNextResumesAfterCursorNotFound() public function testNextResumesAfterCursorNotFound()
{ {
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
...@@ -127,7 +220,6 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -127,7 +220,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$operationTime = null;
$events = []; $events = [];
(new CommandObserver)->observe( (new CommandObserver)->observe(
...@@ -141,12 +233,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -141,12 +233,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertCount(1, $events); $this->assertCount(1, $events);
$this->assertSame('aggregate', $events[0]['started']->getCommandName()); $this->assertSame('aggregate', $events[0]['started']->getCommandName());
$reply = $events[0]['succeeded']->getReply(); $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
$this->assertObjectHasAttribute('cursor', $reply);
$this->assertInternalType('object', $reply->cursor);
$this->assertObjectHasAttribute('postBatchResumeToken', $reply->cursor);
$postBatchResumeToken = $reply->cursor->postBatchResumeToken;
$this->assertInternalType('object', $postBatchResumeToken);
$this->assertFalse($changeStream->valid()); $this->assertFalse($changeStream->valid());
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
...@@ -190,6 +277,11 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -190,6 +277,11 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertEquals($expectedResumeToken, $command->pipeline[0]->{'$changeStream'}->resumeAfter); $this->assertEquals($expectedResumeToken, $command->pipeline[0]->{'$changeStream'}->resumeAfter);
} }
/**
* Prose test: "$changeStream stage for ChangeStream against a server >=4.0
* and <4.0.7 that has not received any results yet MUST include a
* startAtOperationTime option when resuming a changestream."
*/
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime() public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
{ {
if ( ! $this->isStartAtOperationTimeSupported()) { if ( ! $this->isStartAtOperationTimeSupported()) {
...@@ -202,7 +294,6 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -202,7 +294,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$operationTime = null;
$events = []; $events = [];
(new CommandObserver)->observe( (new CommandObserver)->observe(
...@@ -554,6 +645,10 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -554,6 +645,10 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertSameDocument($expectedResult, $changeStream->current());
} }
/**
* Prose test: "Ensure that a cursor returned from an aggregate command with
* a cursor id and an initial empty batch is not closed on the driver side."
*/
public function testInitialCursorIsNotClosed() public function testInitialCursorIsNotClosed()
{ {
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
...@@ -567,7 +662,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -567,7 +662,7 @@ class WatchFunctionalTest extends FunctionalTestCase
* internal Cursor and call isDead(). */ * internal Cursor and call isDead(). */
$this->assertNotEquals('0', (string) $changeStream->getCursorId()); $this->assertNotEquals('0', (string) $changeStream->getCursorId());
$rc = new ReflectionClass('MongoDB\ChangeStream'); $rc = new ReflectionClass(ChangeStream::class);
$rp = $rc->getProperty('iterator'); $rp = $rc->getProperty('iterator');
$rp->setAccessible(true); $rp->setAccessible(true);
...@@ -577,7 +672,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -577,7 +672,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$cursor = $iterator->getInnerIterator(); $cursor = $iterator->getInnerIterator();
$this->assertInstanceOf('MongoDB\Driver\Cursor', $cursor); $this->assertInstanceOf(Cursor::class, $cursor);
$this->assertFalse($cursor->isDead()); $this->assertFalse($cursor->isDead());
} }
...@@ -1109,6 +1204,16 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1109,6 +1204,16 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertEmpty($commands); $this->assertEmpty($commands);
} }
private function getPostBatchResumeTokenFromReply(stdClass $reply)
{
$this->assertObjectHasAttribute('cursor', $reply);
$this->assertInternalType('object', $reply->cursor);
$this->assertObjectHasAttribute('postBatchResumeToken', $reply->cursor);
$this->assertInternalType('object', $reply->cursor->postBatchResumeToken);
return $reply->cursor->postBatchResumeToken;
}
private function insertDocument($document) private function insertDocument($document)
{ {
$insertOne = new InsertOne( $insertOne = new InsertOne(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment