Commit 4c6f7ceb authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-411: postBatchResumeToken, getResumeToken(), and resume improvements

Added ChangeStream::getResumeToken() method (PHPLIB-435), which returns the cached resume token.

postBatchResumeToken and startAfter (introduced in PHPLIB-407) can now be used for resuming.

Replaced TailableCursorIterator with ChangeStreamIterator. In addition to avoiding getMore commands when calling rewind(), the new class also tracks the size of each cursor batch so it can capture the postBatchResumeToken from getMore commands.

Refactored the resume process such that Watch now exclusively constructs the inner iterator for ChangeStream.

UnexpectedValueException is now thrown if firstBatch and nextBatch fields are not found in aggregate and getMore responses, respectively.
parent bfcda32f
......@@ -17,14 +17,12 @@
namespace MongoDB;
use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\CursorId;
use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Model\TailableCursorIterator;
use MongoDB\Model\ChangeStreamIterator;
use Iterator;
/**
......@@ -46,9 +44,8 @@ class ChangeStream implements Iterator
private static $errorCodeInterrupted = 11601;
private static $errorCodeCursorKilled = 237;
private $resumeToken;
private $resumeCallable;
private $csIt;
private $iterator;
private $key = 0;
/**
......@@ -61,14 +58,13 @@ class ChangeStream implements Iterator
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param callable $resumeCallable
* @param boolean $isFirstBatchEmpty
* @param ChangeStreamIterator $iterator
* @param callable $resumeCallable
*/
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty)
public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
{
$this->iterator = $iterator;
$this->resumeCallable = $resumeCallable;
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
}
/**
......@@ -77,15 +73,29 @@ class ChangeStream implements Iterator
*/
public function current()
{
return $this->csIt->current();
return $this->iterator->current();
}
/**
* @return \MongoDB\Driver\CursorId
* @return CursorId
*/
public function getCursorId()
{
return $this->csIt->getInnerIterator()->getId();
return $this->iterator->getInnerIterator()->getId();
}
/**
* Returns the resume token for the iterator's current position.
*
* Null may be returned if no change documents have been iterated and the
* server did not include a postBatchResumeToken in its aggregate or getMore
* command response.
*
* @return array|object|null
*/
public function getResumeToken()
{
return $this->iterator->getResumeToken();
}
/**
......@@ -108,7 +118,7 @@ class ChangeStream implements Iterator
public function next()
{
try {
$this->csIt->next();
$this->iterator->next();
$this->onIteration($this->hasAdvanced);
} catch (RuntimeException $e) {
$this->resumeOrThrow($e);
......@@ -123,7 +133,7 @@ class ChangeStream implements Iterator
public function rewind()
{
try {
$this->csIt->rewind();
$this->iterator->rewind();
/* Unlike next() and resume(), the decision to increment the key
* does not depend on whether the change stream has advanced. This
* ensures that multiple calls to rewind() do not alter state. */
......@@ -139,40 +149,7 @@ class ChangeStream implements Iterator
*/
public function valid()
{
return $this->csIt->valid();
}
/**
* Extracts the resume token (i.e. "_id" field) from the change document.
*
* @param array|object $document Change document
* @return mixed
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
*/
private function extractResumeToken($document)
{
if ( ! is_array($document) && ! is_object($document)) {
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
}
if ($document instanceof Serializable) {
return $this->extractResumeToken($document->bsonSerialize());
}
$resumeToken = is_array($document)
? (isset($document['_id']) ? $document['_id'] : null)
: (isset($document->_id) ? $document->_id : null);
if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
}
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw ResumeTokenException::invalidType($resumeToken);
}
return $resumeToken;
return $this->iterator->valid();
}
/**
......@@ -222,13 +199,11 @@ class ChangeStream implements Iterator
}
/* Return early if there is not a current result. Avoid any attempt to
* increment the iterator's key or extract a resume token */
* increment the iterator's key. */
if (!$this->valid()) {
return;
}
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
if ($incrementKey) {
$this->key++;
}
......@@ -237,16 +212,14 @@ class ChangeStream implements Iterator
}
/**
* Creates a new changeStream after a resumable server error.
* Recreates the ChangeStreamIterator after a resumable server error.
*
* @return void
*/
private function resume()
{
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
$this->csIt->rewind();
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
$this->iterator->rewind();
$this->onIteration($this->hasAdvanced);
}
......
<?php
/*
* Copyright 2019 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace MongoDB\Model;
use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Exception\UnexpectedValueException;
use IteratorIterator;
/**
* ChangeStreamIterator wraps a change stream's tailable cursor.
*
* This iterator tracks the size of each batch in order to determine when the
* postBatchResumeToken is applicable. It also ensures that initial calls to
* rewind() do not execute getMore commands.
*
* @internal
*/
class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
{
private $batchPosition = 0;
private $batchSize;
private $isRewindNop;
private $postBatchResumeToken;
private $resumeToken;
/**
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param integer $firstBatchSize
* @param array|object|null $initialResumeToken
* @param object|null $postBatchResumeToken
*/
public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)
{
if ( ! is_integer($firstBatchSize)) {
throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer');
}
if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
}
if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) {
throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object');
}
parent::__construct($cursor);
$this->batchSize = $firstBatchSize;
$this->isRewindNop = ($firstBatchSize === 0);
$this->postBatchResumeToken = $postBatchResumeToken;
$this->resumeToken = $initialResumeToken;
}
/** @internal */
final public function commandFailed(CommandFailedEvent $event)
{
}
/** @internal */
final public function commandStarted(CommandStartedEvent $event)
{
if ($event->getCommandName() !== 'getMore') {
return;
}
$this->batchPosition = 0;
$this->batchSize = null;
$this->postBatchResumeToken = null;
}
/** @internal */
final public function commandSucceeded(CommandSucceededEvent $event)
{
if ($event->getCommandName() !== 'getMore') {
return;
}
$reply = $event->getReply();
if ( ! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
}
$this->batchSize = count($reply->cursor->nextBatch);
if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
$this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
}
}
/**
* Returns the resume token for the iterator's current position.
*
* Null may be returned if no change documents have been iterated and the
* server did not include a postBatchResumeToken in its aggregate or getMore
* command response.
*
* @return array|object|null
*/
public function getResumeToken()
{
return $this->resumeToken;
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function next()
{
/* Determine if advancing the iterator will execute a getMore command
* (i.e. we are already positioned at the end of the current batch). If
* so, rely on the APM callbacks to reset $batchPosition and update
* $batchSize. Otherwise, we can forgo APM and manually increment
* $batchPosition after calling next(). */
$getMore = $this->isAtEndOfBatch();
if ($getMore) {
\MongoDB\Driver\Monitoring\addSubscriber($this);
}
try {
parent::next();
$this->onIteration(!$getMore);
} finally {
if ($getMore) {
\MongoDB\Driver\Monitoring\removeSubscriber($this);
}
}
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function rewind()
{
if ($this->isRewindNop) {
return;
}
parent::rewind();
$this->onIteration(false);
}
/**
* Extracts the resume token (i.e. "_id" field) from a change document.
*
* @param array|object $document Change document
* @return array|object
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
*/
private function extractResumeToken($document)
{
if ( ! is_array($document) && ! is_object($document)) {
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
}
if ($document instanceof Serializable) {
return $this->extractResumeToken($document->bsonSerialize());
}
$resumeToken = is_array($document)
? (isset($document['_id']) ? $document['_id'] : null)
: (isset($document->_id) ? $document->_id : null);
if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
}
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw ResumeTokenException::invalidType($resumeToken);
}
return $resumeToken;
}
/**
* Return whether the iterator is positioned at the end of the batch.
*
* @return boolean
*/
private function isAtEndOfBatch()
{
return ($this->batchPosition + 1 >= $this->batchSize);
}
/**
* Perform housekeeping after an iteration event.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
* @param boolean $incrementBatchPosition
*/
private function onIteration($incrementBatchPosition)
{
$isValid = $this->valid();
/* Disable rewind()'s NOP behavior once we advance to a valid position.
* This will allow the driver to throw a LogicException if rewind() is
* called after the cursor has advanced past its first element. */
if ($this->isRewindNop && $isValid) {
$this->isRewindNop = false;
}
if ($incrementBatchPosition && $isValid) {
$this->batchPosition++;
}
/* If the iterator is positioned at the end of the batch, apply the
* postBatchResumeToken if it's available. This handles both the case
* where the current batch is empty (since onIteration() will be called
* after a successful getMore) and when the iterator has advanced to the
* last document in its current batch. Otherwise, extract a resume token
* from the current document if possible. */
if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
$this->resumeToken = $this->postBatchResumeToken;
} elseif ($isValid) {
$this->resumeToken = $this->extractResumeToken($this->current());
}
}
}
<?php
/*
* Copyright 2019 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace MongoDB\Model;
use MongoDB\Driver\Cursor;
use IteratorIterator;
/**
* Iterator for tailable cursors.
*
* This iterator may be used to wrap a tailable cursor. By indicating whether
* the cursor's first batch of results is empty, this iterator can NOP initial
* calls to rewind() and prevent it from executing a getMore command.
*
* @internal
*/
class TailableCursorIterator extends IteratorIterator
{
private $isRewindNop;
/**
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param boolean $isFirstBatchEmpty
*/
public function __construct(Cursor $cursor, $isFirstBatchEmpty)
{
parent::__construct($cursor);
$this->isRewindNop = $isFirstBatchEmpty;
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function next()
{
try {
parent::next();
} finally {
/* If the cursor ever advances to a valid position, do not prevent
* future attempts to rewind the cursor. This will allow the driver
* to throw a LogicException if the cursor has been advanced past
* its first element. */
if ($this->valid()) {
$this->isRewindNop = false;
}
}
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function rewind()
{
if ($this->isRewindNop) {
return;
}
parent::rewind();
}
}
......@@ -19,6 +19,7 @@ namespace MongoDB\Operation;
use MongoDB\ChangeStream;
use MongoDB\BSON\TimestampInterface;
use MongoDB\Model\ChangeStreamIterator;
use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager;
......@@ -57,10 +58,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
private $changeStreamOptions;
private $collectionName;
private $databaseName;
private $isFirstBatchEmpty = false;
private $firstBatchSize;
private $hasResumed = false;
private $manager;
private $operationTime;
private $pipeline;
private $resumeCallable;
private $postBatchResumeToken;
/**
* Constructs an aggregate command for creating a change stream.
......@@ -185,12 +188,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
$this->changeStreamOptions['allChangesForCluster'] = true;
}
$this->manager = $manager;
$this->databaseName = (string) $databaseName;
$this->collectionName = isset($collectionName) ? (string) $collectionName : null;
$this->pipeline = $pipeline;
$this->aggregate = $this->createAggregate();
$this->resumeCallable = $this->createResumeCallable($manager);
}
/** @internal */
......@@ -205,7 +208,8 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
return;
}
$this->isFirstBatchEmpty = false;
$this->firstBatchSize = null;
$this->postBatchResumeToken = null;
}
/** @internal */
......@@ -217,14 +221,19 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
$reply = $event->getReply();
/* Note: the spec only refers to collecting an operation time from the
* "original aggregation", so only capture it if we've not already. */
if (!isset($this->operationTime) && isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
$this->operationTime = $reply->operationTime;
if ( ! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
}
$this->firstBatchSize = count($reply->cursor->firstBatch);
if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
$this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
}
if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) {
$this->isFirstBatchEmpty = empty($reply->cursor->firstBatch);
if ($this->shouldCaptureOperationTime($event->getServer()) &&
isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
$this->operationTime = $reply->operationTime;
}
}
......@@ -239,13 +248,14 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
*/
public function execute(Server $server)
{
$cursor = $this->executeAggregate($server);
return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchEmpty);
return new ChangeStream(
$this->createChangeStreamIterator($server),
function($resumeToken) { return $this->resume($resumeToken); }
);
}
/**
* Create the aggregate command for creating a change stream.
* Create the aggregate command for a change stream.
*
* This method is also used to recreate the aggregate command when resuming.
*
......@@ -259,40 +269,27 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
}
private function createResumeCallable(Manager $manager)
/**
* Create a ChangeStreamIterator by executing the aggregate command.
*
* @param Server $server
* @return ChangeStreamIterator
*/
private function createChangeStreamIterator(Server $server)
{
return function($resumeToken = null) use ($manager) {
/* If a resume token was provided, update the "resumeAfter" option
* and ensure that "startAtOperationTime" is no longer set. */
if ($resumeToken !== null) {
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
unset($this->changeStreamOptions['startAtOperationTime']);
}
// Select a new server using the original read preference
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
/* If we captured an operation time from the first aggregate command
* and there is no "resumeAfter" option, set "startAtOperationTime"
* so that we can resume from the original aggregate's time. */
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}
// Recreate the aggregate command and execute to obtain a new cursor
$this->aggregate = $this->createAggregate();
$cursor = $this->executeAggregate($server);
return [$cursor, $this->isFirstBatchEmpty];
};
return new ChangeStreamIterator(
$this->executeAggregate($server),
$this->firstBatchSize,
$this->getInitialResumeToken(),
$this->postBatchResumeToken
);
}
/**
* Execute the aggregate command.
*
* The command will be executed using APM so that we can capture its
* operation time and/or firstBatch size.
* The command will be executed using APM so that we can capture data from
* its response (e.g. firstBatch size, postBatchResumeToken).
*
* @param Server $server
* @return Cursor
......@@ -307,4 +304,98 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
\MongoDB\Driver\Monitoring\removeSubscriber($this);
}
}
/**
* Return the initial resume token for creating the ChangeStreamIterator.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
* @return array|object|null
*/
private function getInitialResumeToken()
{
if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
return $this->postBatchResumeToken;
}
if (isset($this->changeStreamOptions['startAfter'])) {
return $this->changeStreamOptions['startAfter'];
}
if (isset($this->changeStreamOptions['resumeAfter'])) {
return $this->changeStreamOptions['resumeAfter'];
}
return null;
}
/**
* Resumes a change stream.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
* @param array|object|null $resumeToken
* @return ChangeStreamIterator
* @throws InvalidArgumentException
*/
private function resume($resumeToken = null)
{
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
}
$this->hasResumed = true;
// Select a new server using the original read preference
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);
unset($this->changeStreamOptions['resumeAfter']);
unset($this->changeStreamOptions['startAfter']);
unset($this->changeStreamOptions['startAtOperationTime']);
if ($resumeToken !== null) {
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
}
if ($resumeToken === null && $this->operationTime !== null) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}
// Recreate the aggregate command and return a new ChangeStreamIterator
$this->aggregate = $this->createAggregate();
return $this->createChangeStreamIterator($server);
}
/**
* Determine whether to capture operation time from an aggregate response.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
* @param Server $server
* @return boolean
*/
private function shouldCaptureOperationTime(Server $server)
{
if ($this->hasResumed) {
return false;
}
if (isset($this->changeStreamOptions['resumeAfter']) ||
isset($this->changeStreamOptions['startAfter']) ||
isset($this->changeStreamOptions['startAtOperationTime'])) {
return false;
}
if ($this->firstBatchSize > 0) {
return false;
}
if ($this->postBatchResumeToken !== null) {
return false;
}
if ( ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
return false;
}
return true;
}
}
......@@ -4,14 +4,15 @@ namespace MongoDB\Tests\Model;
use MongoDB\Collection;
use MongoDB\Driver\Exception\LogicException;
use MongoDB\Model\TailableCursorIterator;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Model\ChangeStreamIterator;
use MongoDB\Operation\Find;
use MongoDB\Operation\CreateCollection;
use MongoDB\Operation\DropCollection;
use MongoDB\Tests\CommandObserver;
use MongoDB\Tests\FunctionalTestCase;
class TailableCursorIteratorTest extends FunctionalTestCase
class ChangeStreamIteratorTest extends FunctionalTestCase
{
private $collection;
......@@ -28,42 +29,111 @@ class TailableCursorIteratorTest extends FunctionalTestCase
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
}
public function testFirstBatchIsEmpty()
/**
* @dataProvider provideInvalidIntegerValues
*/
public function testFirstBatchArgumentTypeCheck($firstBatchSize)
{
$this->collection->insertOne(['x' => 1]);
$this->expectException(InvalidArgumentException::class);
new ChangeStreamIterator($this->collection->find(), $firstBatchSize, null, null);
}
public function provideInvalidIntegerValues()
{
return $this->wrapValuesForDataProvider($this->getInvalidIntegerValues());
}
public function testInitialResumeToken()
{
$iterator = new ChangeStreamIterator($this->collection->find(), 0, null, null);
$this->assertNull($iterator->getResumeToken());
$iterator = new ChangeStreamIterator($this->collection->find(), 0, ['resumeToken' => 1], null);
$this->assertSameDocument(['resumeToken' => 1], $iterator->getResumeToken());
$iterator = new ChangeStreamIterator($this->collection->find(), 0, (object) ['resumeToken' => 2], null);
$this->assertSameDocument((object) ['resumeToken' => 2], $iterator->getResumeToken());
}
/**
* @dataProvider provideInvalidDocumentValues
*/
public function testInitialResumeTokenArgumentTypeCheck($initialResumeToken)
{
$this->expectException(InvalidArgumentException::class);
new ChangeStreamIterator($this->collection->find(), 0, $initialResumeToken, null);
}
/**
* @dataProvider provideInvalidObjectValues
*/
public function testPostBatchResumeTokenArgumentTypeCheck($postBatchResumeToken)
{
$this->expectException(InvalidArgumentException::class);
new ChangeStreamIterator($this->collection->find(), 0, null, $postBatchResumeToken);
}
public function provideInvalidObjectValues()
{
return $this->wrapValuesForDataProvider(array_merge($this->getInvalidDocumentValues(), [[]]));
}
public function testPostBatchResumeTokenIsReturnedForLastElementInFirstBatch()
{
$this->collection->insertOne(['_id' => ['resumeToken' => 1], 'x' => 1]);
$this->collection->insertOne(['_id' => ['resumeToken' => 2], 'x' => 2]);
$postBatchResumeToken = (object) ['resumeToken' => 'pb'];
$cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]);
$iterator = new ChangeStreamIterator($cursor, 2, null, $postBatchResumeToken);
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertTrue($iterator->valid());
$this->assertSameDocument(['resumeToken' => 1], $iterator->getResumeToken());
$this->assertSameDocument(['_id' => ['resumeToken' => 1], 'x' => 1], $iterator->current());
$iterator->next();
$this->assertTrue($iterator->valid());
$this->assertSameDocument($postBatchResumeToken, $iterator->getResumeToken());
$this->assertSameDocument(['_id' => ['resumeToken' => 2], 'x' => 2], $iterator->current());
}
public function testRewindIsNopWhenFirstBatchIsEmpty()
{
$this->collection->insertOne(['_id' => ['resumeToken' => 1], 'x' => 1]);
$cursor = $this->collection->find(['x' => ['$gt' => 1]], ['cursorType' => Find::TAILABLE]);
$iterator = new TailableCursorIterator($cursor, true);
$iterator = new ChangeStreamIterator($cursor, 0, null, null);
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertFalse($iterator->valid());
$this->collection->insertOne(['x' => 2]);
$this->collection->insertOne(['_id' => ['resumeToken' => 2], 'x' => 2]);
$iterator->next();
$this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 2], $iterator->current());
$this->assertSameDocument(['_id' => ['resumeToken' => 2], 'x' => 2], $iterator->current());
$this->expectException(LogicException::class);
$iterator->rewind();
}
public function testFirstBatchIsNotEmpty()
public function testRewindAdvancesWhenFirstBatchIsNotEmpty()
{
$this->collection->insertOne(['x' => 1]);
$this->collection->insertOne(['_id' => ['resumeToken' => 1], 'x' => 1]);
$cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]);
$iterator = new TailableCursorIterator($cursor, false);
$iterator = new ChangeStreamIterator($cursor, 1, null, null);
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 1], $iterator->current());
$this->assertSameDocument(['_id' => ['resumeToken' => 1], 'x' => 1], $iterator->current());
$this->collection->insertOne(['x' => 2]);
$this->collection->insertOne(['_id' => ['resumeToken' => 2], 'x' => 2]);
$iterator->next();
$this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 2], $iterator->current());
$this->assertSameDocument(['_id' => ['resumeToken' => 2], 'x' => 2], $iterator->current());
$this->expectException(LogicException::class);
$iterator->rewind();
......
......@@ -119,9 +119,86 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame($expectedCommands, $commands);
}
public function testResumeBeforeReceivingAnyResultsIncludesPostBatchResumeToken()
{
if ( ! $this->isPostBatchResumeTokenSupported()) {
$this->markTestSkipped('postBatchResumeToken is not supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$operationTime = null;
$events = [];
(new CommandObserver)->observe(
function() use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(1, $events);
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
$reply = $events[0]['succeeded']->getReply();
$this->assertObjectHasAttribute('cursor', $reply);
$this->assertInternalType('object', $reply->cursor);
$this->assertObjectHasAttribute('postBatchResumeToken', $reply->cursor);
$postBatchResumeToken = $reply->cursor->postBatchResumeToken;
$this->assertInternalType('object', $postBatchResumeToken);
$this->assertFalse($changeStream->valid());
$this->killChangeStreamCursor($changeStream);
$this->assertNoCommandExecuted(function() use ($changeStream) { $changeStream->rewind(); });
$events = [];
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(3, $events);
$this->assertSame('getMore', $events[0]['started']->getCommandName());
$this->arrayHasKey('failed', $events[0]);
$this->assertSame('aggregate', $events[1]['started']->getCommandName());
$this->assertResumeAfter($postBatchResumeToken, $events[1]['started']->getCommand());
$this->arrayHasKey('succeeded', $events[1]);
// Original cursor is freed immediately after the change stream resumes
$this->assertSame('killCursors', $events[2]['started']->getCommandName());
$this->arrayHasKey('succeeded', $events[2]);
$this->assertFalse($changeStream->valid());
}
private function assertResumeAfter($expectedResumeToken, stdClass $command)
{
$this->assertObjectHasAttribute('pipeline', $command);
$this->assertInternalType('array', $command->pipeline);
$this->assertArrayHasKey(0, $command->pipeline);
$this->assertObjectHasAttribute('$changeStream', $command->pipeline[0]);
$this->assertObjectHasAttribute('resumeAfter', $command->pipeline[0]->{'$changeStream'});
$this->assertEquals($expectedResumeToken, $command->pipeline[0]->{'$changeStream'}->resumeAfter);
}
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
{
$this->skipIfStartAtOperationTimeNotSupported();
if ( ! $this->isStartAtOperationTimeSupported()) {
$this->markTestSkipped('startAtOperationTime is not supported');
}
if ($this->isPostBatchResumeTokenSupported()) {
$this->markTestSkipped('postBatchResumeToken takes precedence over startAtOperationTime');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
......@@ -496,7 +573,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertNotEquals('0', (string) $changeStream->getCursorId());
$rc = new ReflectionClass('MongoDB\ChangeStream');
$rp = $rc->getProperty('csIt');
$rp = $rc->getProperty('iterator');
$rp->setAccessible(true);
$iterator = $rp->getValue($changeStream);
......@@ -935,6 +1012,16 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertEquals(1, $writeResult->getInsertedCount());
}
private function isPostBatchResumeTokenSupported()
{
return version_compare($this->getServerVersion(), '4.0.7', '>=');
}
private function isStartAtOperationTimeSupported()
{
return \MongoDB\server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime);
}
private function killChangeStreamCursor(ChangeStream $changeStream)
{
$command = [
......@@ -945,11 +1032,4 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation = new DatabaseCommand($this->getDatabaseName(), $command);
$operation->execute($this->getPrimaryServer());
}
private function skipIfStartAtOperationTimeNotSupported()
{
if (!\MongoDB\server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime)) {
$this->markTestSkipped('startAtOperationTime is not supported');
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment