Commit 971545cc authored by Katherine Walker's avatar Katherine Walker

Merge pull request #479

parents de2e3310 42bf8258
...@@ -117,7 +117,26 @@ class ChangeStream implements Iterator ...@@ -117,7 +117,26 @@ class ChangeStream implements Iterator
*/ */
public function rewind() public function rewind()
{ {
$this->csIt->rewind(); $resumable = false;
try {
$this->csIt->rewind();
if ($this->valid()) {
$this->extractResumeToken($this->csIt->current());
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
$resumable = true;
}
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
}
}
if ($resumable) {
$this->resume();
}
} }
/** /**
......
...@@ -73,9 +73,6 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -73,9 +73,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertSameDocument($expectedResult, $changeStream->current());
} }
/**
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
*/
public function testNextResumesAfterConnectionException() public function testNextResumesAfterConnectionException()
{ {
/* In order to trigger a dropped connection, we'll use a new client with /* In order to trigger a dropped connection, we'll use a new client with
...@@ -129,6 +126,56 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -129,6 +126,56 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame($expectedCommands, $commands); $this->assertSame($expectedCommands, $commands);
} }
public function testRewindResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
* a socket timeout that is less than the change stream's maxAwaitTimeMS
* option. */
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($primaryServer);
$commands = [];
try {
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->rewind();
},
function(stdClass $command) use (&$commands) {
$commands[] = key((array) $command);
}
);
$this->fail('ConnectionTimeoutException was not thrown');
} catch (ConnectionTimeoutException $e) {}
$expectedCommands = [
/* The initial aggregate command for change streams returns a cursor
* envelope with an empty initial batch, since there are no changes
* to report at the moment the change stream is created. Therefore,
* we expect a getMore to be issued when we first advance the change
* stream (with either rewind() or next()). */
'getMore',
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
* getMore command encounters a client socket timeout and leaves the
* cursor open on the server. ChangeStream should catch this error
* and resume by issuing a new aggregate command. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore',
];
$this->assertSame($expectedCommands, $commands);
}
public function testNoChangeAfterResumeBeforeInsert() public function testNoChangeAfterResumeBeforeInsert()
{ {
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
...@@ -260,7 +307,6 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -260,7 +307,6 @@ class WatchFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\ResumeTokenException * @expectedException MongoDB\Exception\ResumeTokenException
* @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented
*/ */
public function testNextCannotExtractResumeToken() public function testNextCannotExtractResumeToken()
{ {
...@@ -269,13 +315,28 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -269,13 +315,28 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind(); /* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$changeStream->next(); $changeStream->next();
} }
/**
* @expectedException MongoDB\Exception\ResumeTokenException
*/
public function testRewindCannotExtractResumeToken()
{
$pipeline = [['$project' => ['_id' => 0 ]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$changeStream->rewind();
}
public function testMaxAwaitTimeMS() public function testMaxAwaitTimeMS()
{ {
/* On average, an acknowledged write takes about 20 ms to appear in a /* On average, an acknowledged write takes about 20 ms to appear in a
...@@ -320,6 +381,52 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -320,6 +381,52 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
} }
public function testRewindResumesAfterCursorNotFound()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->killChangeStreamCursor($changeStream);
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->current());
}
public function testRewindExtractsResumeTokenAndNextResumes()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
}
private function insertDocument($document) private function insertDocument($document)
{ {
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document); $insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment