Unverified Commit ee0b4539 authored by Andreas Braun's avatar Andreas Braun

Merge pull request #659

parents a42e3479 944e7292
...@@ -50,7 +50,11 @@ ...@@ -50,7 +50,11 @@
"journal": true, "journal": true,
"logappend": true, "logappend": true,
"port": 4400, "port": 4400,
"bind_ip_all": true "bind_ip_all": true,
"setParameter": {
"periodicNoopIntervalSecs": 1,
"writePeriodicNoops": true
}
} }, } },
{ "procParams": { { "procParams": {
"dbpath": "/tmp/SHARDED-RS/SHARD1/4401", "dbpath": "/tmp/SHARDED-RS/SHARD1/4401",
...@@ -59,7 +63,11 @@ ...@@ -59,7 +63,11 @@
"journal": true, "journal": true,
"logappend": true, "logappend": true,
"port": 4401, "port": 4401,
"bind_ip_all": true "bind_ip_all": true,
"setParameter": {
"periodicNoopIntervalSecs": 1,
"writePeriodicNoops": true
}
} } } }
] ]
} }
...@@ -76,7 +84,11 @@ ...@@ -76,7 +84,11 @@
"journal": true, "journal": true,
"logappend": true, "logappend": true,
"port": 4410, "port": 4410,
"bind_ip_all": true "bind_ip_all": true,
"setParameter": {
"periodicNoopIntervalSecs": 1,
"writePeriodicNoops": true
}
} }, } },
{ "procParams": { { "procParams": {
"dbpath": "/tmp/SHARDED-RS/SHARD2/4411", "dbpath": "/tmp/SHARDED-RS/SHARD2/4411",
...@@ -85,7 +97,11 @@ ...@@ -85,7 +97,11 @@
"journal": true, "journal": true,
"logappend": true, "logappend": true,
"port": 4411, "port": 4411,
"bind_ip_all": true "bind_ip_all": true,
"setParameter": {
"periodicNoopIntervalSecs": 1,
"writePeriodicNoops": true
}
} } } }
] ]
} }
......
...@@ -935,6 +935,10 @@ class DocumentationExamplesTest extends FunctionalTestCase ...@@ -935,6 +935,10 @@ class DocumentationExamplesTest extends FunctionalTestCase
{ {
$this->skipIfChangeStreamIsNotSupported(); $this->skipIfChangeStreamIsNotSupported();
if ($this->isShardedCluster()) {
$this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
}
$db = new Database($this->manager, $this->getDatabaseName()); $db = new Database($this->manager, $this->getDatabaseName());
$db->dropCollection('inventory'); $db->dropCollection('inventory');
$db->createCollection('inventory'); $db->createCollection('inventory');
......
...@@ -336,9 +336,6 @@ abstract class FunctionalTestCase extends TestCase ...@@ -336,9 +336,6 @@ abstract class FunctionalTestCase extends TestCase
if (! $this->isShardedClusterUsingReplicasets()) { if (! $this->isShardedClusterUsingReplicasets()) {
$this->markTestSkipped('$changeStream is only supported with replicasets'); $this->markTestSkipped('$changeStream is only supported with replicasets');
} }
// Temporarily skip tests because of an issue with change streams in the driver
$this->markTestSkipped('$changeStreams currently don\'t on replica sets');
break; break;
case Server::TYPE_RS_PRIMARY: case Server::TYPE_RS_PRIMARY:
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
namespace MongoDB\Tests\Operation; namespace MongoDB\Tests\Operation;
use Closure; use Closure;
use Iterator;
use MongoDB\BSON\TimestampInterface; use MongoDB\BSON\TimestampInterface;
use MongoDB\ChangeStream; use MongoDB\ChangeStream;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
...@@ -19,6 +20,7 @@ use MongoDB\Operation\DatabaseCommand; ...@@ -19,6 +20,7 @@ use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne; use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch; use MongoDB\Operation\Watch;
use MongoDB\Tests\CommandObserver; use MongoDB\Tests\CommandObserver;
use PHPUnit\Framework\ExpectationFailedException;
use ReflectionClass; use ReflectionClass;
use stdClass; use stdClass;
use Symfony\Bridge\PhpUnit\SetUpTearDownTrait; use Symfony\Bridge\PhpUnit\SetUpTearDownTrait;
...@@ -71,8 +73,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -71,8 +73,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]); $this->insertDocument(['x' => 2]);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken()); $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
$changeStream->next(); $changeStream->next();
...@@ -81,8 +82,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -81,8 +82,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 3]); $this->insertDocument(['x' => 3]);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken()); $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
} }
...@@ -134,23 +134,21 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -134,23 +134,21 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]); $this->insertDocument(['x' => 2]);
$events = []; $lastEvent = null;
(new CommandObserver())->observe( (new CommandObserver())->observe(
function () use ($changeStream) { function () use ($changeStream) {
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
}, },
function (array $event) use (&$events) { function (array $event) use (&$lastEvent) {
$events[] = $event; $lastEvent = $event;
} }
); );
$this->assertCount(1, $events); $this->assertNotNull($lastEvent);
$this->assertSame('getMore', $events[0]['started']->getCommandName()); $this->assertSame('getMore', $lastEvent['started']->getCommandName());
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply()); $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($lastEvent['succeeded']->getReply());
$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken()); $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
$changeStream->next(); $changeStream->next();
...@@ -171,8 +169,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -171,8 +169,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -188,8 +185,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -188,8 +185,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 2, 'x' => 'bar']); $this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -395,6 +391,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -395,6 +391,8 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testRewindMultipleTimesWithResults() public function testRewindMultipleTimesWithResults()
{ {
$this->skipIfIsShardedCluster('Cursor needs to be advanced multiple times and can\'t be rewound afterwards.');
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
...@@ -487,8 +485,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -487,8 +485,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -507,8 +504,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -507,8 +504,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 2, 'x' => 'bar']); $this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -523,6 +519,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -523,6 +519,8 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testResumeMultipleTimesInSuccession() public function testResumeMultipleTimesInSuccession()
{ {
$this->skipIfIsShardedCluster('getMore may return empty response before periodicNoopIntervalSecs on sharded clusters.');
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
...@@ -656,8 +654,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -656,8 +654,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key()); $this->assertSame(0, $changeStream->key());
$changeStream->next(); $changeStream->next();
...@@ -676,8 +673,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -676,8 +673,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 2, 'x' => 'bar']); $this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key()); $this->assertSame(1, $changeStream->key());
} }
...@@ -693,8 +689,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -693,8 +689,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->rewind(); $changeStream->rewind();
$this->assertFalse($changeStream->valid()); $this->assertFalse($changeStream->valid());
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -765,9 +760,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -765,9 +760,9 @@ class WatchFunctionalTest extends FunctionalTestCase
public function provideNonResumableErrorCodes() public function provideNonResumableErrorCodes()
{ {
return [ return [
[136], // CappedPositionLost 'CappedPositionLost' => [136],
[237], // CursorKilled 'CursorKilled' => [237],
[11601], // Interrupted 'Interrupted' => [11601],
]; ];
} }
...@@ -796,7 +791,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -796,7 +791,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->expectException(ResumeTokenException::class); $this->expectException(ResumeTokenException::class);
$this->expectExceptionMessage('Resume token not found in change document'); $this->expectExceptionMessage('Resume token not found in change document');
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
} }
/** /**
...@@ -819,7 +814,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -819,7 +814,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$this->expectException(ServerException::class); $this->expectException(ServerException::class);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
} }
/** /**
...@@ -847,7 +842,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -847,7 +842,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->expectException(ResumeTokenException::class); $this->expectException(ResumeTokenException::class);
$this->expectExceptionMessage('Expected resume token to have type "array or object" but found "string"'); $this->expectExceptionMessage('Expected resume token to have type "array or object" but found "string"');
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
} }
/** /**
...@@ -870,7 +865,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -870,7 +865,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$this->expectException(ServerException::class); $this->expectException(ServerException::class);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
} }
public function testMaxAwaitTimeMS() public function testMaxAwaitTimeMS()
...@@ -916,12 +911,24 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -916,12 +911,24 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1]); $this->insertDocument(['_id' => 1]);
/* Advancing the change stream again will issue a getMore, but the /* Advancing the change stream again will issue a getMore, but the
* server should not block since a document has been inserted. */ * server should not block since a document has been inserted.
$startTime = microtime(true); * For sharded clusters, we have to repeat the getMore iteration until
$changeStream->next(); * the cursor is valid since the first getMore commands after an insert
$duration = microtime(true) - $startTime; * may not return any data. Only the time of the last getMore command is
$this->assertLessThan($pivot, $duration); * taken. */
$attempts = $this->isShardedCluster() ? 5 : 1;
for ($i = 0; $i < $attempts; $i++) {
$startTime = microtime(true);
$changeStream->next();
$duration = microtime(true) - $startTime;
if ($changeStream->valid()) {
break;
}
}
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertLessThan($pivot, $duration);
} }
public function testRewindExtractsResumeTokenAndNextResumes() public function testRewindExtractsResumeTokenAndNextResumes()
...@@ -940,17 +947,25 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -940,17 +947,25 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->rewind(); $changeStream->rewind();
$this->assertFalse($changeStream->valid()); $this->assertFalse($changeStream->valid());
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->current()->_id; $resumeToken = $changeStream->current()->_id;
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions; $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSame($resumeToken, $changeStream->getResumeToken()); $this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
$changeStream->rewind(); $changeStream->rewind();
$this->assertTrue($changeStream->valid());
if ($this->isShardedCluster()) {
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this->advanceCursorUntilValid($changeStream);
} else {
$this->assertTrue($changeStream->valid());
}
$this->assertSame(0, $changeStream->key()); $this->assertSame(0, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -963,8 +978,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -963,8 +978,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key()); $this->assertSame(1, $changeStream->key());
$expectedResult = [ $expectedResult = [
...@@ -988,18 +1002,25 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -988,18 +1002,25 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']); $this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->current()->_id; $resumeToken = $changeStream->current()->_id;
$options = $this->defaultOptions + ['resumeAfter' => $resumeToken]; $options = $this->defaultOptions + ['resumeAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSame($resumeToken, $changeStream->getResumeToken()); $this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
$changeStream->rewind(); $changeStream->rewind();
$this->assertTrue($changeStream->valid());
if ($this->isShardedCluster()) {
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this->advanceCursorUntilValid($changeStream);
} else {
$this->assertTrue($changeStream->valid());
}
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -1027,18 +1048,25 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1027,18 +1048,25 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']); $this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->current()->_id; $resumeToken = $changeStream->current()->_id;
$options = $this->defaultOptions + ['startAfter' => $resumeToken]; $options = $this->defaultOptions + ['startAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
$this->assertSame($resumeToken, $changeStream->getResumeToken()); $this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
$changeStream->rewind(); $changeStream->rewind();
$this->assertTrue($changeStream->valid());
if ($this->isShardedCluster()) {
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this->advanceCursorUntilValid($changeStream);
} else {
$this->assertTrue($changeStream->valid());
}
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -1064,8 +1092,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1064,8 +1092,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$this->assertMatchesDocument($expectedChangeDocument, $changeStream->current()); $this->assertMatchesDocument($expectedChangeDocument, $changeStream->current());
} }
...@@ -1114,7 +1141,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1114,7 +1141,7 @@ class WatchFunctionalTest extends FunctionalTestCase
/* Note: we intentionally do not start iteration with rewind() to ensure /* Note: we intentionally do not start iteration with rewind() to ensure
* that next() behaves identically when called without rewind(). */ * that next() behaves identically when called without rewind(). */
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertSame(0, $changeStream->key()); $this->assertSame(0, $changeStream->key());
...@@ -1139,7 +1166,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1139,7 +1166,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertNull($changeStream->key()); $this->assertNull($changeStream->key());
try { try {
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->fail('Exception for missing resume token was not thrown'); $this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) { } catch (ResumeTokenException $e) {
/* On server versions < 4.1.8, a client-side error is thrown. */ /* On server versions < 4.1.8, a client-side error is thrown. */
...@@ -1234,7 +1261,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1234,7 +1261,7 @@ class WatchFunctionalTest extends FunctionalTestCase
// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted. // Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
$this->dropCollection(); $this->dropCollection();
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertNull($rp->getValue($changeStream)); $this->assertNull($rp->getValue($changeStream));
} }
...@@ -1356,6 +1383,10 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1356,6 +1383,10 @@ class WatchFunctionalTest extends FunctionalTestCase
*/ */
public function testOriginalReadPreferenceIsPreservedOnResume() public function testOriginalReadPreferenceIsPreservedOnResume()
{ {
if ($this->isShardedCluster()) {
$this->markTestSkipped('Test does not apply to sharded clusters');
}
$readPreference = new ReadPreference('secondary'); $readPreference = new ReadPreference('secondary');
$options = ['readPreference' => $readPreference] + $this->defaultOptions; $options = ['readPreference' => $readPreference] + $this->defaultOptions;
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
...@@ -1437,6 +1468,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1437,6 +1468,8 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1'); $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
} }
$this->skipIfIsShardedCluster('Resume token behaviour can\'t be reliably tested on sharded clusters.');
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$lastOpTime = null; $lastOpTime = null;
...@@ -1454,7 +1487,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1454,7 +1487,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken(); $resumeToken = $changeStream->getResumeToken();
...@@ -1499,7 +1532,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1499,7 +1532,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken(); $resumeToken = $changeStream->getResumeToken();
...@@ -1546,8 +1579,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1546,8 +1579,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->getResumeToken(); $resumeToken = $changeStream->getResumeToken();
$options = ['startAfter' => $resumeToken] + $this->defaultOptions; $options = ['startAfter' => $resumeToken] + $this->defaultOptions;
...@@ -1556,7 +1588,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1556,7 +1588,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->rewind(); $changeStream->rewind();
$this->insertDocument(['x' => 2]); $this->insertDocument(['x' => 2]);
$changeStream->next(); $this->advanceCursorUntilValid($changeStream);
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
...@@ -1637,4 +1669,32 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -1637,4 +1669,32 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new DatabaseCommand($this->getDatabaseName(), $command); $operation = new DatabaseCommand($this->getDatabaseName(), $command);
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
} }
private function advanceCursorUntilValid(Iterator $iterator, $limitOnShardedClusters = 5)
{
if (! $this->isShardedCluster()) {
$iterator->next();
$this->assertTrue($iterator->valid());
return;
}
for ($i = 0; $i < $limitOnShardedClusters; $i++) {
$iterator->next();
if ($iterator->valid()) {
return;
}
}
throw new ExpectationFailedException(sprintf('Expected cursor to return an element but none was found after %d attempts.', $limitOnShardedClusters));
}
private function skipIfIsShardedCluster($message)
{
if (! $this->isShardedCluster()) {
return;
}
$this->markTestSkipped(sprintf('Test does not apply on sharded clusters: %s', $message));
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment