Commit 84c365c0 authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-451: ChangeStream::rewind() should never execute getMore

Tests were revised to no longer expect rewind() to encounter an error and resume. In most tests, rewind() is a NOP because the change stream has no events to return upon creation.

Since Watch now always executes aggregate commands with APM, the check for startAtOperationTime support was moved to createResumeCallable(). We continue to only capture operationTime from the very first aggregate command.

Various assertions for current() returning null were also changed to instead check valid(). Both forms are equivalent, but checking valid() is more consistent with our iteration examples.
parent b6c7f117
......@@ -24,7 +24,7 @@ use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use IteratorIterator;
use MongoDB\Model\TailableCursorIterator;
use Iterator;
/**
......@@ -63,11 +63,12 @@ class ChangeStream implements Iterator
* @internal
* @param Cursor $cursor
* @param callable $resumeCallable
* @param boolean $isFirstBatchEmpty
*/
public function __construct(Cursor $cursor, callable $resumeCallable)
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty)
{
$this->resumeCallable = $resumeCallable;
$this->csIt = new IteratorIterator($cursor);
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
}
/**
......@@ -242,17 +243,11 @@ class ChangeStream implements Iterator
*/
private function resume()
{
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt;
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
$this->csIt->rewind();
/* Note: if we are resuming after a call to ChangeStream::rewind(),
* $hasAdvanced will always be false. For it to be true, rewind() would
* need to have thrown a RuntimeException with a resumable error, which
* can only happen during the first call to IteratorIterator::rewind()
* before onIteration() has a chance to set $hasAdvanced to true.
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
* rewinds) or throw a LogicException (rewind after next), neither of
* which would result in a call to resume(). */
$this->onIteration($this->hasAdvanced);
}
......
......@@ -38,12 +38,12 @@ class TailableCursorIterator extends IteratorIterator
*
* @internal
* @param Cursor $cursor
* @param boolean $isFirstBatchIsEmpty
* @param boolean $isFirstBatchEmpty
*/
public function __construct(Cursor $cursor, $isFirstBatchIsEmpty)
public function __construct(Cursor $cursor, $isFirstBatchEmpty)
{
parent::__construct($cursor);
$this->isRewindNop = $isFirstBatchIsEmpty;
$this->isRewindNop = $isFirstBatchEmpty;
}
/**
......
......@@ -57,6 +57,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
private $changeStreamOptions;
private $collectionName;
private $databaseName;
private $isFirstBatchEmpty = false;
private $operationTime;
private $pipeline;
private $resumeCallable;
......@@ -200,6 +201,11 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
/** @internal */
final public function commandStarted(CommandStartedEvent $event)
{
if ($event->getCommandName() !== 'aggregate') {
return;
}
$this->isFirstBatchEmpty = false;
}
/** @internal */
......@@ -211,9 +217,15 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
$reply = $event->getReply();
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
/* Note: the spec only refers to collecting an operation time from the
* "original aggregation", so only capture it if we've not already. */
if (!isset($this->operationTime) && isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
$this->operationTime = $reply->operationTime;
}
if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) {
$this->isFirstBatchEmpty = empty($reply->cursor->firstBatch);
}
}
/**
......@@ -227,7 +239,9 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
*/
public function execute(Server $server)
{
return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
$cursor = $this->executeAggregate($server);
return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchEmpty);
}
/**
......@@ -255,40 +269,36 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
unset($this->changeStreamOptions['startAtOperationTime']);
}
// Select a new server using the original read preference
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
/* If we captured an operation time from the first aggregate command
* and there is no "resumeAfter" option, set "startAtOperationTime"
* so that we can resume from the original aggregate's time. */
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}
// Recreate the aggregate command and execute to obtain a new cursor
$this->aggregate = $this->createAggregate();
$cursor = $this->executeAggregate($server);
/* Select a new server using the read preference, execute this
* operation on it, and return the new ChangeStream. */
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
return $this->execute($server);
return [$cursor, $this->isFirstBatchEmpty];
};
}
/**
* Execute the aggregate command and optionally capture its operation time.
* Execute the aggregate command.
*
* The command will be executed using APM so that we can capture its
* operation time and/or firstBatch size.
*
* @param Server $server
* @return Cursor
*/
private function executeAggregate(Server $server)
{
/* If we've already captured an operation time or the server does not
* support resuming from an operation time (e.g. MongoDB 3.6), execute
* the aggregation directly and return its cursor. */
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
return $this->aggregate->execute($server);
}
/* Otherwise, execute the aggregation using command monitoring so that
* we can capture its operation time with commandSucceeded(). */
\MongoDB\Driver\Monitoring\addSubscriber($this);
try {
......
This diff is collapsed.
......@@ -54,10 +54,11 @@ class ChangeStreamsProseTest extends FunctionalTestCase
$this->createCollection();
$changeStream = $this->collection->watch();
$changeStream->rewind();
$this->expectException(ServerException::class);
$this->expectExceptionCode($errorCode);
$changeStream->rewind();
$changeStream->next();
}
public function provideNonResumableErrorCodes()
......
......@@ -233,22 +233,33 @@ class ChangeStreamsSpecTest extends FunctionalTestCase
* Iterate a change stream.
*
* @param ChangeStream $changeStream
* @param integer $limit
* @return BSONDocument[]
*/
private function iterateChangeStream(ChangeStream $changeStream, $limit = 0)
{
if ($limit < 0) {
throw new LogicException('$limit is negative');
}
/* Limit iterations to guard against an infinite loop should a test fail
* to return as many results as are expected. Require at least one
* iteration to allow next() a chance to throw for error tests. */
$maxIterations = $limit + 1;
$events = [];
for ($changeStream->rewind(); count($events) < $limit; $changeStream->next()) {
for ($i = 0, $changeStream->rewind(); $i < $maxIterations; $i++, $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
$this->assertInstanceOf(BSONDocument::class, $event);
$events[] = $event;
if (count($events) >= $limit) {
break;
}
}
return $events;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment