Remove first batch of functional tests

parent 637e0e9d
......@@ -802,801 +802,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->advanceCursorUntilValid($changeStream);
}
/**
* Prose test: "ChangeStream will throw an exception if the server response
* is missing the resume token (if wire version is < 8, this is a driver-
* side error; for 8+, this is a server-side error)"
*/
public function testResumeTokenInvalidTypeClientSideError()
{
if (version_compare($this->getServerVersion(), '4.1.8', '>=')) {
$this->markTestSkipped('Server rejects change streams that modify resume token (SERVER-37786)');
}
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
/* Insert two documents to ensure the client does not ignore the first
* document's resume token in favor of a postBatchResumeToken */
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$this->expectException(ResumeTokenException::class);
$this->expectExceptionMessage('Expected resume token to have type "array or object" but found "string"');
$this->advanceCursorUntilValid($changeStream);
}
/**
* Prose test: "ChangeStream will throw an exception if the server response
* is missing the resume token (if wire version is < 8, this is a driver-
* side error; for 8+, this is a server-side error)"
*/
public function testResumeTokenInvalidTypeServerSideError()
{
if (version_compare($this->getServerVersion(), '4.1.8', '<')) {
$this->markTestSkipped('Server does not reject change streams that modify resume token');
}
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->insertDocument(['x' => 1]);
$this->expectException(ServerException::class);
$this->advanceCursorUntilValid($changeStream);
}
public function testMaxAwaitTimeMS()
{
/* On average, an acknowledged write takes about 20 ms to appear in a
* change stream on the server so we'll use a higher maxAwaitTimeMS to
* ensure we see the write. */
$maxAwaitTimeMS = 500;
/* Calculate an approximate pivot to use for time assertions. We will
* assert that the duration of blocking responses is greater than this
* value, and vice versa. */
$pivot = $maxAwaitTimeMS * 0.001 * 0.9;
/* Calculate an approximate upper bound to use for time assertions. We
* will assert that the duration of blocking responses is less than this
* value. */
$upperBound = $maxAwaitTimeMS * 0.001 * 1.5;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
$changeStream = $operation->execute($this->getPrimaryServer());
// Rewinding does not issue a getMore, so we should not expect a delay.
$startTime = microtime(true);
$changeStream->rewind();
$duration = microtime(true) - $startTime;
$this->assertLessThan($pivot, $duration);
$this->assertFalse($changeStream->valid());
/* Advancing again on a change stream will issue a getMore, so we should
* expect a delay. Expect to wait at least maxAwaitTimeMS, since no new
* documents will be inserted to wake up the server's query thread. Also
* ensure we don't wait too long (server default is one second). */
$startTime = microtime(true);
$changeStream->next();
$duration = microtime(true) - $startTime;
$this->assertGreaterThan($pivot, $duration);
$this->assertLessThan($upperBound, $duration);
$this->assertFalse($changeStream->valid());
$this->insertDocument(['_id' => 1]);
/* Advancing the change stream again will issue a getMore, but the
* server should not block since a document has been inserted.
* For sharded clusters, we have to repeat the getMore iteration until
* the cursor is valid since the first getMore commands after an insert
* may not return any data. Only the time of the last getMore command is
* taken. */
$attempts = $this->isShardedCluster() ? 5 : 1;
for ($i = 0; $i < $attempts; $i++) {
$startTime = microtime(true);
$changeStream->next();
$duration = microtime(true) - $startTime;
if ($changeStream->valid()) {
break;
}
}
$this->assertTrue($changeStream->valid());
$this->assertLessThan($pivot, $duration);
}
public function testRewindExtractsResumeTokenAndNextResumes()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$this->insertDocument(['_id' => 3, 'x' => 'baz']);
/* Obtain a resume token for the first insert. This will allow us to
* start a change stream from that point and ensure aggregate returns
* the second insert in its first batch, which in turn will serve as a
* resume token for rewind() to extract. */
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->advanceCursorUntilValid($changeStream);
$resumeToken = $changeStream->current()->_id;
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
$changeStream->rewind();
if ($this->isShardedCluster()) {
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this->advanceCursorUntilValid($changeStream);
} else {
$this->assertTrue($changeStream->valid());
}
$this->assertSame(0, $changeStream->key());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream);
$this->advanceCursorUntilValid($changeStream);
$this->assertSame(1, $changeStream->key());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 3, 'x' => 'baz'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 3],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}
public function testResumeAfterOption()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$this->advanceCursorUntilValid($changeStream);
$resumeToken = $changeStream->current()->_id;
$options = $this->defaultOptions + ['resumeAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
$changeStream->rewind();
if ($this->isShardedCluster()) {
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this->advanceCursorUntilValid($changeStream);
} else {
$this->assertTrue($changeStream->valid());
}
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}
public function testStartAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('startAfter is not supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$this->advanceCursorUntilValid($changeStream);
$resumeToken = $changeStream->current()->_id;
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
$changeStream->rewind();
if ($this->isShardedCluster()) {
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this->advanceCursorUntilValid($changeStream);
} else {
$this->assertTrue($changeStream->valid());
}
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}
/**
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
*/
public function testTypeMapOption(array $typeMap, $expectedChangeDocument)
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['typeMap' => $typeMap] + $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->advanceCursorUntilValid($changeStream);
$this->assertMatchesDocument($expectedChangeDocument, $changeStream->current());
}
public function provideTypeMapOptionsAndExpectedChangeDocument()
{
/* Note: the "_id" and "ns" fields are purposefully omitted because the
* resume token's value cannot be anticipated and the collection name,
* which is generated from the test name, is not available in the data
* provider, respectively. */
return [
[
['root' => 'array', 'document' => 'array'],
[
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'documentKey' => ['_id' => 1],
],
],
[
['root' => 'object', 'document' => 'array'],
(object) [
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'documentKey' => ['_id' => 1],
],
],
[
['root' => 'array', 'document' => 'stdClass'],
[
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
'documentKey' => (object) ['_id' => 1],
],
],
];
}
public function testNextAdvancesKey()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
/* Note: we intentionally do not start iteration with rewind() to ensure
* that next() behaves identically when called without rewind(). */
$this->advanceCursorUntilValid($changeStream);
$this->assertSame(0, $changeStream->key());
$changeStream->next();
$this->assertSame(1, $changeStream->key());
}
public function testResumeTokenNotFoundDoesNotAdvanceKey()
{
$pipeline = [['$project' => ['_id' => 0 ]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$this->insertDocument(['x' => 3]);
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
try {
$this->advanceCursorUntilValid($changeStream);
$this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) {
/* On server versions < 4.1.8, a client-side error is thrown. */
} catch (ServerException $e) {
/* On server versions >= 4.1.8, the error is thrown server-side. */
}
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
try {
$changeStream->next();
$this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) {
} catch (ServerException $e) {
}
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
}
public function testSessionPersistsAfterResume()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = null;
$originalSession = null;
$sessionAfterResume = [];
$commands = [];
/* We want to ensure that the lsid of the initial aggregate matches the
* lsid of any aggregates after the change stream resumes. After
* PHPC-1152 is complete, we will ensure that the lsid of the initial
* aggregate matches the lsid of any subsequent aggregates and getMores.
*/
(new CommandObserver())->observe(
function () use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
},
function (array $event) use (&$originalSession) {
$command = $event['started']->getCommand();
if (isset($command->aggregate)) {
$originalSession = bin2hex((string) $command->lsid->id);
}
}
);
$changeStream->rewind();
$this->killChangeStreamCursor($changeStream);
(new CommandObserver())->observe(
function () use (&$changeStream) {
$changeStream->next();
},
function (array $event) use (&$sessionAfterResume, &$commands) {
$commands[] = $event['started']->getCommandName();
$sessionAfterResume[] = bin2hex((string) $event['started']->getCommand()->lsid->id);
}
);
$expectedCommands = [
/* We expect a getMore to be issued because we are calling next(). */
'getMore',
/* Since we have killed the cursor, ChangeStream will resume by
* issuing a new aggregate commmand. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
];
$this->assertSame($expectedCommands, $commands);
foreach ($sessionAfterResume as $session) {
$this->assertEquals($session, $originalSession);
}
}
public function testSessionFreed()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$rc = new ReflectionClass($changeStream);
$rp = $rc->getProperty('resumeCallable');
$rp->setAccessible(true);
$this->assertIsCallable($rp->getValue($changeStream));
// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
$this->dropCollection();
$this->advanceCursorUntilValid($changeStream);
$this->assertNull($rp->getValue($changeStream));
}
/**
* Prose test: "ChangeStream will automatically resume one time on a
* resumable error (including not master) with the initial pipeline and
* options, except for the addition/update of a resumeToken."
*/
public function testResumeRepeatsOriginalPipelineAndOptions()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$aggregateCommands = [];
$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER],
]);
(new CommandObserver())->observe(
function () use ($operation) {
$changeStream = $operation->execute($this->getPrimaryServer());
// The first next will hit the fail point, causing a resume
$changeStream->next();
$changeStream->next();
},
function (array $event) use (&$aggregateCommands) {
$command = $event['started']->getCommand();
if ($event['started']->getCommandName() !== 'aggregate') {
return;
}
$aggregateCommands[] = (array) $command;
}
);
$this->assertCount(2, $aggregateCommands);
$this->assertThat(
$aggregateCommands[0]['pipeline'][0]->{'$changeStream'},
$this->logicalNot(
$this->logicalOr(
$this->objectHasAttribute('resumeAfter'),
$this->objectHasAttribute('startAfter'),
$this->objectHasAttribute('startAtOperationTime')
)
)
);
$this->assertThat(
$aggregateCommands[1]['pipeline'][0]->{'$changeStream'},
$this->logicalOr(
$this->objectHasAttribute('resumeAfter'),
$this->objectHasAttribute('startAfter'),
$this->objectHasAttribute('startAtOperationTime')
)
);
$aggregateCommands = array_map(
function (array $aggregateCommand) {
// Remove resume options from the changestream document
if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) {
$aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key(
(array) $aggregateCommand['pipeline'][0]->{'$changeStream'},
['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false]
);
}
// Remove options we don't want to compare between commands
return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]);
},
$aggregateCommands
);
// Ensure options in original and resuming aggregate command match
$this->assertEquals($aggregateCommands[0], $aggregateCommands[1]);
}
/**
* Prose test: "ChangeStream will not attempt to resume on any error
* encountered while executing an aggregate command."
*/
public function testErrorDuringAggregateCommandDoesNotCauseResume()
{
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
$this->markTestSkipped('failCommand is not supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$commandCount = 0;
$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::INTERRUPTED],
]);
$this->expectException(CommandException::class);
(new CommandObserver())->observe(
function () use ($operation) {
$operation->execute($this->getPrimaryServer());
},
function (array $event) use (&$commandCount) {
$commandCount++;
}
);
$this->assertSame(1, $commandCount);
}
/**
* Prose test: "ChangeStream will perform server selection before attempting
* to resume, using initial readPreference"
*/
public function testOriginalReadPreferenceIsPreservedOnResume()
{
if ($this->isShardedCluster()) {
$this->markTestSkipped('Test does not apply to sharded clusters');
}
$readPreference = new ReadPreference('secondary');
$options = ['readPreference' => $readPreference] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
try {
$secondary = $this->manager->selectServer($readPreference);
} catch (ConnectionTimeoutException $e) {
$this->markTestSkipped('Secondary is not available');
}
$changeStream = $operation->execute($secondary);
$previousCursorId = $changeStream->getCursorId();
$this->killChangeStreamCursor($changeStream);
$changeStream->next();
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());
$getCursor = Closure::bind(
function () {
return $this->iterator->getInnerIterator();
},
$changeStream,
ChangeStream::class
);
/** @var Cursor $cursor */
$cursor = $getCursor();
self::assertTrue($cursor->getServer()->isSecondary());
}
/**
* Prose test
* For a ChangeStream under these conditions:
* - Running against a server <4.0.7.
* - The batch is empty or has been iterated to the last document.
* Expected result:
* - getResumeToken must return the _id of the last document returned if one exists.
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
* - If resumeAfter was not specified, the getResumeToken result must be empty.
*/
public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
{
if ($this->isPostBatchResumeTokenSupported()) {
$this->markTestSkipped('postBatchResumeToken is supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertNull($changeStream->getResumeToken());
$this->insertDocument(['x' => 1]);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();
$this->assertSame($resumeToken, $changeStream->current()->_id);
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSame($resumeToken, $changeStream->getResumeToken());
}
/**
* For a ChangeStream under these conditions:
* - The batch is not empty.
* - The batch hasn’t been iterated at all.
* - Only the initial aggregate command has been executed.
* Expected result:
* - getResumeToken must return startAfter from the initial aggregate if the option was specified.
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
* - If neither the startAfter nor resumeAfter options were specified, the getResumeToken result must be empty.
*/
public function testResumeTokenBehaviour()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
}
$this->skipIfIsShardedCluster('Resume token behaviour can\'t be reliably tested on sharded clusters.');
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$lastOpTime = null;
$changeStream = null;
(new CommandObserver())->observe(function () use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
}, function ($event) use (&$lastOpTime) {
$this->assertInstanceOf(CommandSucceededEvent::class, $event['succeeded']);
$reply = $event['succeeded']->getReply();
$this->assertObjectHasAttribute('operationTime', $reply);
$lastOpTime = $reply->operationTime;
});
$this->insertDocument(['x' => 1]);
$this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();
$this->insertDocument(['x' => 2]);
// Test startAfter option
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertEquals($resumeToken, $changeStream->getResumeToken());
// Test resumeAfter option
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertEquals($resumeToken, $changeStream->getResumeToken());
// Test without option
$options = ['startAtOperationTime' => $lastOpTime] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertNull($changeStream->getResumeToken());
}
/**
* Prose test: "$changeStream stage for ChangeStream started with startAfter
* against a server >=4.1.1 that has not received any results yet MUST
* include a startAfter option and MUST NOT include a resumeAfter option
* when resuming a change stream."
*/
public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken();
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->killChangeStreamCursor($changeStream);
$aggregateCommand = null;
(new CommandObserver())->observe(
function () use ($changeStream) {
$changeStream->next();
},
function (array $event) use (&$aggregateCommand) {
if ($event['started']->getCommandName() !== 'aggregate') {
return;
}
$aggregateCommand = $event['started']->getCommand();
}
);
$this->assertNotNull($aggregateCommand);
$this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
$this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
}
/**
* Prose test: "$changeStream stage for ChangeStream started with startAfter
* against a server >=4.1.1 that has received at least one result MUST
* include a resumeAfter option and MUST NOT include a startAfter option
* when resuming a change stream."
*/
public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$this->advanceCursorUntilValid($changeStream);
$resumeToken = $changeStream->getResumeToken();
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->insertDocument(['x' => 2]);
$this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->killChangeStreamCursor($changeStream);
$aggregateCommand = null;
(new CommandObserver())->observe(
function () use ($changeStream) {
$changeStream->next();
},
function (array $event) use (&$aggregateCommand) {
if ($event['started']->getCommandName() !== 'aggregate') {
return;
}
$aggregateCommand = $event['started']->getCommand();
}
);
$this->assertNotNull($aggregateCommand);
$this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
$this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
}
private function assertNoCommandExecuted(callable $callable)
{
$commands = [];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment