Commit 75f0fb80 authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #622

parents 401496aa d13ab854
...@@ -212,6 +212,8 @@ class ChangeStream implements Iterator ...@@ -212,6 +212,8 @@ class ChangeStream implements Iterator
return; return;
} }
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
/* Increment the key if the iteration event was a call to next() and we /* Increment the key if the iteration event was a call to next() and we
* have already advanced past the first result. */ * have already advanced past the first result. */
if ($isNext && $this->hasAdvanced) { if ($isNext && $this->hasAdvanced) {
...@@ -219,7 +221,6 @@ class ChangeStream implements Iterator ...@@ -219,7 +221,6 @@ class ChangeStream implements Iterator
} }
$this->hasAdvanced = true; $this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
} }
/** /**
......
...@@ -706,7 +706,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -706,7 +706,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame(1, $changeStream->key()); $this->assertSame(1, $changeStream->key());
} }
public function testResumeTokenNotFoundAdvancesKey() public function testResumeTokenNotFoundDoesNotAdvanceKey()
{ {
if (version_compare($this->getServerVersion(), '4.1.8', '>=')) { if (version_compare($this->getServerVersion(), '4.1.8', '>=')) {
$this->markTestSkipped('Server rejects change streams that modify resume token (SERVER-37786)'); $this->markTestSkipped('Server rejects change streams that modify resume token (SERVER-37786)');
...@@ -717,8 +717,6 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -717,8 +717,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]); $this->insertDocument(['x' => 2]);
$this->insertDocument(['x' => 3]); $this->insertDocument(['x' => 3]);
...@@ -735,14 +733,14 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -735,14 +733,14 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->fail('ResumeTokenException was not thrown'); $this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {} } catch (ResumeTokenException $e) {}
$this->assertSame(1, $changeStream->key()); $this->assertSame(0, $changeStream->key());
try { try {
$changeStream->next(); $changeStream->next();
$this->fail('ResumeTokenException was not thrown'); $this->fail('ResumeTokenException was not thrown');
} catch (ResumeTokenException $e) {} } catch (ResumeTokenException $e) {}
$this->assertSame(2, $changeStream->key()); $this->assertSame(0, $changeStream->key());
} }
public function testSessionPersistsAfterResume() public function testSessionPersistsAfterResume()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment