Commit 428d2172 authored by Katherine Walker's avatar Katherine Walker

PHPLIB-335: ChangeStream::next() should increment key even if ResumeTokenException is thrown

parent e847b06c
......@@ -95,11 +95,11 @@ class ChangeStream implements Iterator
try {
$this->csIt->next();
if ($this->valid()) {
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
if ($this->hasAdvanced) {
$this->key++;
}
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
......@@ -127,8 +127,8 @@ class ChangeStream implements Iterator
try {
$this->csIt->rewind();
if ($this->valid()) {
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
......
......@@ -7,6 +7,7 @@ use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch;
......@@ -552,6 +553,41 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame(1, $changeStream->key());
}
public function testResumeTokenNotFoundAdvancesKey()
{
$pipeline = [['$project' => ['_id' => 0 ]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$this->insertDocument(['x' => 3]);
try {
$changeStream->rewind();
$this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {}
$this->assertSame(0, $changeStream->key());
try {
$changeStream->next();
$this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {}
$this->assertSame(1, $changeStream->key());
try {
$changeStream->next();
$this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {}
$this->assertSame(2, $changeStream->key());
}
private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment