Commit 39057724 authored by Jeremy Mikola's avatar Jeremy Mikola

Merge branch 'v1.3'

parents c662aa19 99453efa
......@@ -38,7 +38,8 @@ class ChangeStream implements Iterator
private $resumeToken;
private $resumeCallable;
private $csIt;
private $key;
private $key = 0;
private $hasAdvanced = false;
const CURSOR_NOT_FOUND = 43;
......@@ -53,8 +54,6 @@ class ChangeStream implements Iterator
{
$this->resumeCallable = $resumeCallable;
$this->csIt = new IteratorIterator($cursor);
$this->key = 0;
}
/**
......@@ -96,8 +95,11 @@ class ChangeStream implements Iterator
try {
$this->csIt->next();
if ($this->valid()) {
if ($this->hasAdvanced) {
$this->key++;
}
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
$this->key++;
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
......@@ -125,6 +127,7 @@ class ChangeStream implements Iterator
try {
$this->csIt->rewind();
if ($this->valid()) {
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
} catch (RuntimeException $e) {
......
......@@ -3,7 +3,6 @@
namespace MongoDB\Tests\Operation;
use MongoDB\ChangeStream;
use MongoDB\Client;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
......@@ -529,6 +528,58 @@ class WatchFunctionalTest extends FunctionalTestCase
];
}
public function testNextAdvancesKey()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$changeStream->next();
$this->assertSame(0, $changeStream->key());
$changeStream->next();
$this->assertSame(1, $changeStream->key());
}
public function testResumeTokenNotFoundAdvancesKey()
{
$pipeline = [['$project' => ['_id' => 0 ]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);
$this->insertDocument(['x' => 3]);
try {
$changeStream->rewind();
$this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {}
$this->assertSame(0, $changeStream->key());
try {
$changeStream->next();
$this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {}
$this->assertSame(1, $changeStream->key());
try {
$changeStream->next();
$this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {}
$this->assertSame(2, $changeStream->key());
}
private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment