Unverified Commit e0a2b899 authored by Andreas Braun's avatar Andreas Braun

Merge pull request #674

parents a800cdef 7f20ffb6
......@@ -54,6 +54,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
/** @var boolean */
private $isRewindNop;
/** @var boolean */
private $isValid = false;
/** @var object|null */
private $postBatchResumeToken;
......@@ -126,6 +129,15 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
}
}
/**
* @see https://php.net/iteratoriterator.current
* @return mixed
*/
public function current()
{
return $this->isValid ? parent::current() : null;
}
/**
* Returns the resume token for the iterator's current position.
*
......@@ -140,6 +152,15 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
return $this->resumeToken;
}
/**
* @see https://php.net/iteratoriterator.key
* @return mixed
*/
public function key()
{
return $this->isValid ? parent::key() : null;
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
......@@ -181,6 +202,15 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
$this->onIteration(false);
}
/**
* @see https://php.net/iteratoriterator.valid
* @return boolean
*/
public function valid()
{
return $this->isValid;
}
/**
* Extracts the resume token (i.e. "_id" field) from a change document.
*
......@@ -204,10 +234,12 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
: (isset($document->_id) ? $document->_id : null);
if (! isset($resumeToken)) {
$this->isValid = false;
throw ResumeTokenException::notFound();
}
if (! is_array($resumeToken) && ! is_object($resumeToken)) {
$this->isValid = false;
throw ResumeTokenException::invalidType($resumeToken);
}
......@@ -232,16 +264,16 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
*/
private function onIteration($incrementBatchPosition)
{
$isValid = $this->valid();
$this->isValid = parent::valid();
/* Disable rewind()'s NOP behavior once we advance to a valid position.
* This will allow the driver to throw a LogicException if rewind() is
* called after the cursor has advanced past its first element. */
if ($this->isRewindNop && $isValid) {
if ($this->isRewindNop && $this->isValid) {
$this->isRewindNop = false;
}
if ($incrementBatchPosition && $isValid) {
if ($incrementBatchPosition && $this->isValid) {
$this->batchPosition++;
}
......@@ -253,7 +285,7 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
* from the current document if possible. */
if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
$this->resumeToken = $this->postBatchResumeToken;
} elseif ($isValid) {
} elseif ($this->isValid) {
$this->resumeToken = $this->extractResumeToken($this->current());
}
}
......
......@@ -1125,34 +1125,23 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next();
$this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) {
/* If a client-side error is thrown (server < 4.1.8), the tailable
* cursor's position is still valid. This may change once PHPLIB-456
* is implemented. */
$expectedValid = true;
$expectedKey = 0;
/* On server versions < 4.1.8, a client-side error is thrown. */
} catch (ServerException $e) {
/* If a server-side error is thrown (server >= 4.1.8), the tailable
* cursor's position is not valid. */
$expectedValid = false;
$expectedKey = null;
/* On server versions >= 4.1.8, the error is thrown server-side. */
}
$this->assertSame($expectedValid, $changeStream->valid());
$this->assertSame($expectedKey, $changeStream->key());
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
try {
$changeStream->next();
$this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) {
$expectedValid = true;
$expectedKey = 0;
} catch (ServerException $e) {
$expectedValid = false;
$expectedKey = null;
}
$this->assertSame($expectedValid, $changeStream->valid());
$this->assertSame($expectedKey, $changeStream->key());
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
}
public function testSessionPersistsAfterResume()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment