Commit a3a14c26 authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-442: Ensure change stream resume token is updated after resume

This extracts common logic in next() and rewind() to a new method, which is now also called by resume() after it rewinds the internal iterator.
parent aac8e540
...@@ -102,22 +102,7 @@ class ChangeStream implements Iterator ...@@ -102,22 +102,7 @@ class ChangeStream implements Iterator
{ {
try { try {
$this->csIt->next(); $this->csIt->next();
if ($this->valid()) { $this->onIteration(true);
if ($this->hasAdvanced) {
$this->key++;
}
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
/* If the cursorId is 0, the server has invalidated the cursor so we
* will never perform another getMore. This means that we cannot
* resume and we can therefore unset the resumeCallable, which will
* free any reference to Watch. This will also free the only
* reference to an implicit session, since any such reference
* belongs to Watch. */
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if ($this->isResumableError($e)) { if ($this->isResumableError($e)) {
$this->resume(); $this->resume();
...@@ -133,14 +118,7 @@ class ChangeStream implements Iterator ...@@ -133,14 +118,7 @@ class ChangeStream implements Iterator
{ {
try { try {
$this->csIt->rewind(); $this->csIt->rewind();
if ($this->valid()) { $this->onIteration(false);
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
// As with next(), free the callable once we know it will never be used.
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if ($this->isResumableError($e)) { if ($this->isResumableError($e)) {
$this->resume(); $this->resume();
...@@ -214,6 +192,38 @@ class ChangeStream implements Iterator ...@@ -214,6 +192,38 @@ class ChangeStream implements Iterator
return true; return true;
} }
/**
* Perform housekeeping after an iteration event (i.e. next or rewind).
*
* @param boolean $isNext Whether the iteration event was a call to next()
* @throws ResumeTokenException
*/
private function onIteration($isNext)
{
/* If the cursorId is 0, the server has invalidated the cursor and we
* will never perform another getMore nor need to resume since any
* remaining results (up to and including the invalidate event) will
* have been received in the last response. Therefore, we can unset the
* resumeCallable. This will free any reference to Watch as well as the
* only reference to any implicit session created therein. */
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
if (!$this->valid()) {
return;
}
/* Increment the key if the iteration event was a call to next() and we
* have already advanced past the first result. */
if ($isNext && $this->hasAdvanced) {
$this->key++;
}
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
/** /**
* Creates a new changeStream after a resumable server error. * Creates a new changeStream after a resumable server error.
* *
...@@ -224,5 +234,6 @@ class ChangeStream implements Iterator ...@@ -224,5 +234,6 @@ class ChangeStream implements Iterator
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken); $newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt; $this->csIt = $newChangeStream->csIt;
$this->csIt->rewind(); $this->csIt->rewind();
$this->onIteration(false);
} }
} }
...@@ -322,6 +322,70 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -322,6 +322,70 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
} }
public function testResumeTokenIsUpdatedAfterResuming()
{
$this->insertDocument(['_id' => 1]);
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertNull($changeStream->current());
$this->insertDocument(['_id' => 2]);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream);
$this->insertDocument(['_id' => 3]);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 3],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 3],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
/* Triggering a consecutive failure will allow us to test whether the
* resume token was properly updated after the last resume. If the
* resume token updated, the next result will be {_id: 4}; otherwise,
* we'll see {_id: 3} returned again. */
$this->killChangeStreamCursor($changeStream);
$this->insertDocument(['_id' => 4]);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 4],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 4],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}
public function testKey() public function testKey()
{ {
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment