Commit af3ec661 authored by Katherine Walker's avatar Katherine Walker

Add rewind tests to WatchFunctionalTest

parent eb2dee4b
......@@ -73,9 +73,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSameDocument($expectedResult, $changeStream->current());
}
/**
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
*/
public function testNextResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
......@@ -129,6 +126,56 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame($expectedCommands, $commands);
}
public function testRewindResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
* a socket timeout that is less than the change stream's maxAwaitTimeMS
* option. */
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($primaryServer);
$commands = [];
try {
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->rewind();
},
function(stdClass $command) use (&$commands) {
$commands[] = key((array) $command);
}
);
$this->fail('ConnectionTimeoutException was not thrown');
} catch (ConnectionTimeoutException $e) {}
$expectedCommands = [
/* The initial aggregate command for change streams returns a cursor
* envelope with an empty initial batch, since there are no changes
* to report at the moment the change stream is created. Therefore,
* we expect a getMore to be issued when we first advance the change
* stream (with either rewind() or next()). */
'getMore',
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
* getMore command encounters a client socket timeout and leaves the
* cursor open on the server. ChangeStream should catch this error
* and resume by issuing a new aggregate command. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore',
];
$this->assertSame($expectedCommands, $commands);
}
public function testNoChangeAfterResumeBeforeInsert()
{
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
......@@ -260,7 +307,6 @@ class WatchFunctionalTest extends FunctionalTestCase
/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented
*/
public function testNextCannotExtractResumeToken()
{
......@@ -269,13 +315,26 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->insertDocument(['x' => 1]);
$changeStream->next();
}
/**
* @expectedException MongoDB\Exception\ResumeTokenException
*/
public function testRewindCannotExtractResumeToken()
{
$pipeline = [['$project' => ['_id' => 0 ]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$changeStream->rewind();
}
public function testMaxAwaitTimeMS()
{
/* On average, an acknowledged write takes about 20 ms to appear in a
......@@ -320,6 +379,54 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertTrue($changeStream->valid());
}
public function testResumeAfterKillThenNoOperations()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->killChangeStreamCursor($changeStream);
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->current());
}
public function testResumeAfterKillThenOperation()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
}
private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment