Commit 516e56b5 authored by Jeremy Mikola's avatar Jeremy Mikola

Merge branch 'v1.4'

parents aded75ea b19328ac
...@@ -50,6 +50,11 @@ class ChangeStream implements Iterator ...@@ -50,6 +50,11 @@ class ChangeStream implements Iterator
private $resumeCallable; private $resumeCallable;
private $csIt; private $csIt;
private $key = 0; private $key = 0;
/**
* Whether the change stream has advanced to its first result. This is used
* to determine whether $key should be incremented after an iteration event.
*/
private $hasAdvanced = false; private $hasAdvanced = false;
/** /**
...@@ -103,7 +108,7 @@ class ChangeStream implements Iterator ...@@ -103,7 +108,7 @@ class ChangeStream implements Iterator
{ {
try { try {
$this->csIt->next(); $this->csIt->next();
$this->onIteration(true); $this->onIteration($this->hasAdvanced);
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
$this->resumeOrThrow($e); $this->resumeOrThrow($e);
} }
...@@ -118,6 +123,9 @@ class ChangeStream implements Iterator ...@@ -118,6 +123,9 @@ class ChangeStream implements Iterator
{ {
try { try {
$this->csIt->rewind(); $this->csIt->rewind();
/* Unlike next() and resume(), the decision to increment the key
* does not depend on whether the change stream has advanced. This
* ensures that multiple calls to rewind() do not alter state. */
$this->onIteration(false); $this->onIteration(false);
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
$this->resumeOrThrow($e); $this->resumeOrThrow($e);
...@@ -195,12 +203,12 @@ class ChangeStream implements Iterator ...@@ -195,12 +203,12 @@ class ChangeStream implements Iterator
} }
/** /**
* Perform housekeeping after an iteration event (i.e. next or rewind). * Perform housekeeping after an iteration event.
* *
* @param boolean $isNext Whether the iteration event was a call to next() * @param boolean $incrementKey Increment $key if there is a current result
* @throws ResumeTokenException * @throws ResumeTokenException
*/ */
private function onIteration($isNext) private function onIteration($incrementKey)
{ {
/* If the cursorId is 0, the server has invalidated the cursor and we /* If the cursorId is 0, the server has invalidated the cursor and we
* will never perform another getMore nor need to resume since any * will never perform another getMore nor need to resume since any
...@@ -212,15 +220,15 @@ class ChangeStream implements Iterator ...@@ -212,15 +220,15 @@ class ChangeStream implements Iterator
$this->resumeCallable = null; $this->resumeCallable = null;
} }
/* Return early if there is not a current result. Avoid any attempt to
* increment the iterator's key or extract a resume token */
if (!$this->valid()) { if (!$this->valid()) {
return; return;
} }
$this->resumeToken = $this->extractResumeToken($this->csIt->current()); $this->resumeToken = $this->extractResumeToken($this->csIt->current());
/* Increment the key if the iteration event was a call to next() and we if ($incrementKey) {
* have already advanced past the first result. */
if ($isNext && $this->hasAdvanced) {
$this->key++; $this->key++;
} }
...@@ -237,7 +245,15 @@ class ChangeStream implements Iterator ...@@ -237,7 +245,15 @@ class ChangeStream implements Iterator
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken); $newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt; $this->csIt = $newChangeStream->csIt;
$this->csIt->rewind(); $this->csIt->rewind();
$this->onIteration(false); /* Note: if we are resuming after a call to ChangeStream::rewind(),
* $hasAdvanced will always be false. For it to be true, rewind() would
* need to have thrown a RuntimeException with a resumable error, which
* can only happen during the first call to IteratorIterator::rewind()
* before onIteration() has a chance to set $hasAdvanced to true.
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
* rewinds) or throw a LogicException (rewind after next), neither of
* which would result in a call to resume(). */
$this->onIteration($this->hasAdvanced);
} }
/** /**
......
...@@ -8,6 +8,7 @@ use MongoDB\Driver\Manager; ...@@ -8,6 +8,7 @@ use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference; use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\Exception\ConnectionTimeoutException; use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Driver\Exception\LogicException;
use MongoDB\Exception\ResumeTokenException; use MongoDB\Exception\ResumeTokenException;
use MongoDB\Operation\DatabaseCommand; use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne; use MongoDB\Operation\InsertOne;
...@@ -217,6 +218,62 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -217,6 +218,62 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime); $this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime);
} }
public function testRewindMultipleTimesWithResults()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$this->assertNotNull($changeStream->current());
// Subsequent rewind does not change iterator state
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$this->assertNotNull($changeStream->current());
$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key());
$this->assertNotNull($changeStream->current());
// Rewinding after advancing the iterator is an error
$this->expectException(LogicException::class);
$changeStream->rewind();
}
public function testRewindMultipleTimesWithNoResults()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());
// Subsequent rewind does not change iterator state
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());
$changeStream->next();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());
// Rewinding after advancing the iterator is an error
$this->expectException(LogicException::class);
$changeStream->rewind();
}
public function testRewindResumesAfterConnectionException() public function testRewindResumesAfterConnectionException()
{ {
/* In order to trigger a dropped connection, we'll use a new client with /* In order to trigger a dropped connection, we'll use a new client with
...@@ -314,20 +371,67 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -314,20 +371,67 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
} }
public function testResumeTokenIsUpdatedAfterResuming() public function testResumeMultipleTimesInSuccession()
{ {
$this->insertDocument(['_id' => 1]); $operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
$operation->execute($this->getPrimaryServer());
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
/* Killing the cursor when there are no results will test that neither
* the initial rewind() nor its resume attempt incremented the key. */
$this->killChangeStreamCursor($changeStream);
$changeStream->rewind(); $changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current()); $this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 1]);
/* Killing the cursor a second time when there is a result will test
* that the resume attempt picks up the latest change. */
$this->killChangeStreamCursor($changeStream);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
/* Killing the cursor a second time will not trigger a resume until
* ChangeStream::next() is called. A successive call to rewind() should
* not change the iterator's state and preserve the current result. */
$this->killChangeStreamCursor($changeStream);
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->insertDocument(['_id' => 2]); $this->insertDocument(['_id' => 2]);
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -345,6 +449,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -345,6 +449,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(2, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -366,6 +471,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -366,6 +471,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$this->assertSame(3, $changeStream->key());
$expectedResult = [ $expectedResult = [
'_id' => $changeStream->current()->_id, '_id' => $changeStream->current()->_id,
...@@ -697,6 +803,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -697,6 +803,8 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]); $this->insertDocument(['x' => 2]);
/* Note: we intentionally do not start iteration with rewind() to ensure
* that next() behaves identically when called without rewind(). */
$changeStream->next(); $changeStream->next();
$this->assertSame(0, $changeStream->key()); $this->assertSame(0, $changeStream->key());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment