PHPLIB-457: Add missing change stream prose test 14

parent 6a860132
......@@ -11,6 +11,7 @@ use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Driver\Exception\LogicException;
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\ResumeTokenException;
......@@ -1404,6 +1405,67 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame($resumeToken, $changeStream->getResumeToken());
}
/**
* For a ChangeStream under these conditions:
* - The batch is not empty.
* - The batch hasn’t been iterated at all.
* - Only the initial aggregate command has been executed.
* Expected result:
* - getResumeToken must return startAfter from the initial aggregate if the option was specified.
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
* - If neither the startAfter nor resumeAfter options were specified, the getResumeToken result must be empty.
*/
public function testResumeTokenBehaviour()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$lastOpTime = null;
$changeStream = null;
(new CommandObserver())->observe(function () use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
}, function ($event) use (&$lastOpTime) {
$this->assertInstanceOf(CommandSucceededEvent::class, $event['succeeded']);
$reply = $event['succeeded']->getReply();
$this->assertObjectHasAttribute('operationTime', $reply);
$lastOpTime = $reply->operationTime;
});
$this->insertDocument(['x' => 1]);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();
$this->insertDocument(['x' => 2]);
// Test startAfter option
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertEquals($resumeToken, $changeStream->getResumeToken());
// Test resumeAfter option
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertEquals($resumeToken, $changeStream->getResumeToken());
// Test without option
$options = ['startAtOperationTime' => $lastOpTime] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertNull($changeStream->getResumeToken());
}
/**
* Prose test: "$changeStream stage for ChangeStream started with startAfter
* against a server >=4.1.1 that has not received any results yet MUST
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment