Commit d38c095f authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #639

parents bfcda32f b3da293b
...@@ -17,14 +17,12 @@ ...@@ -17,14 +17,12 @@
namespace MongoDB; namespace MongoDB;
use MongoDB\BSON\Serializable; use MongoDB\Driver\CursorId;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\ConnectionException; use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\RuntimeException; use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Exception\ServerException; use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException; use MongoDB\Exception\ResumeTokenException;
use MongoDB\Model\TailableCursorIterator; use MongoDB\Model\ChangeStreamIterator;
use Iterator; use Iterator;
/** /**
...@@ -42,13 +40,14 @@ class ChangeStream implements Iterator ...@@ -42,13 +40,14 @@ class ChangeStream implements Iterator
*/ */
const CURSOR_NOT_FOUND = 43; const CURSOR_NOT_FOUND = 43;
private static $errorCodeCappedPositionLost = 136; private static $nonResumableErrorCodes = [
private static $errorCodeInterrupted = 11601; 136, // CappedPositionLost
private static $errorCodeCursorKilled = 237; 237, // CursorKilled
11601, // Interrupted
];
private $resumeToken;
private $resumeCallable; private $resumeCallable;
private $csIt; private $iterator;
private $key = 0; private $key = 0;
/** /**
...@@ -61,14 +60,13 @@ class ChangeStream implements Iterator ...@@ -61,14 +60,13 @@ class ChangeStream implements Iterator
* Constructor. * Constructor.
* *
* @internal * @internal
* @param Cursor $cursor * @param ChangeStreamIterator $iterator
* @param callable $resumeCallable * @param callable $resumeCallable
* @param boolean $isFirstBatchEmpty
*/ */
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty) public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
{ {
$this->iterator = $iterator;
$this->resumeCallable = $resumeCallable; $this->resumeCallable = $resumeCallable;
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
} }
/** /**
...@@ -77,15 +75,29 @@ class ChangeStream implements Iterator ...@@ -77,15 +75,29 @@ class ChangeStream implements Iterator
*/ */
public function current() public function current()
{ {
return $this->csIt->current(); return $this->iterator->current();
} }
/** /**
* @return \MongoDB\Driver\CursorId * @return CursorId
*/ */
public function getCursorId() public function getCursorId()
{ {
return $this->csIt->getInnerIterator()->getId(); return $this->iterator->getInnerIterator()->getId();
}
/**
* Returns the resume token for the iterator's current position.
*
* Null may be returned if no change documents have been iterated and the
* server did not include a postBatchResumeToken in its aggregate or getMore
* command response.
*
* @return array|object|null
*/
public function getResumeToken()
{
return $this->iterator->getResumeToken();
} }
/** /**
...@@ -108,7 +120,7 @@ class ChangeStream implements Iterator ...@@ -108,7 +120,7 @@ class ChangeStream implements Iterator
public function next() public function next()
{ {
try { try {
$this->csIt->next(); $this->iterator->next();
$this->onIteration($this->hasAdvanced); $this->onIteration($this->hasAdvanced);
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
$this->resumeOrThrow($e); $this->resumeOrThrow($e);
...@@ -123,7 +135,7 @@ class ChangeStream implements Iterator ...@@ -123,7 +135,7 @@ class ChangeStream implements Iterator
public function rewind() public function rewind()
{ {
try { try {
$this->csIt->rewind(); $this->iterator->rewind();
/* Unlike next() and resume(), the decision to increment the key /* Unlike next() and resume(), the decision to increment the key
* does not depend on whether the change stream has advanced. This * does not depend on whether the change stream has advanced. This
* ensures that multiple calls to rewind() do not alter state. */ * ensures that multiple calls to rewind() do not alter state. */
...@@ -139,40 +151,7 @@ class ChangeStream implements Iterator ...@@ -139,40 +151,7 @@ class ChangeStream implements Iterator
*/ */
public function valid() public function valid()
{ {
return $this->csIt->valid(); return $this->iterator->valid();
}
/**
* Extracts the resume token (i.e. "_id" field) from the change document.
*
* @param array|object $document Change document
* @return mixed
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
*/
private function extractResumeToken($document)
{
if ( ! is_array($document) && ! is_object($document)) {
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
}
if ($document instanceof Serializable) {
return $this->extractResumeToken($document->bsonSerialize());
}
$resumeToken = is_array($document)
? (isset($document['_id']) ? $document['_id'] : null)
: (isset($document->_id) ? $document->_id : null);
if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
}
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw ResumeTokenException::invalidType($resumeToken);
}
return $resumeToken;
} }
/** /**
...@@ -196,7 +175,7 @@ class ChangeStream implements Iterator ...@@ -196,7 +175,7 @@ class ChangeStream implements Iterator
return false; return false;
} }
if (in_array($exception->getCode(), [self::$errorCodeCappedPositionLost, self::$errorCodeCursorKilled, self::$errorCodeInterrupted])) { if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
return false; return false;
} }
...@@ -222,13 +201,11 @@ class ChangeStream implements Iterator ...@@ -222,13 +201,11 @@ class ChangeStream implements Iterator
} }
/* Return early if there is not a current result. Avoid any attempt to /* Return early if there is not a current result. Avoid any attempt to
* increment the iterator's key or extract a resume token */ * increment the iterator's key. */
if (!$this->valid()) { if (!$this->valid()) {
return; return;
} }
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
if ($incrementKey) { if ($incrementKey) {
$this->key++; $this->key++;
} }
...@@ -237,16 +214,14 @@ class ChangeStream implements Iterator ...@@ -237,16 +214,14 @@ class ChangeStream implements Iterator
} }
/** /**
* Creates a new changeStream after a resumable server error. * Recreates the ChangeStreamIterator after a resumable server error.
* *
* @return void * @return void
*/ */
private function resume() private function resume()
{ {
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken); $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
$this->iterator->rewind();
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
$this->csIt->rewind();
$this->onIteration($this->hasAdvanced); $this->onIteration($this->hasAdvanced);
} }
......
<?php
/*
* Copyright 2019 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace MongoDB\Model;
use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Exception\UnexpectedValueException;
use IteratorIterator;
/**
* ChangeStreamIterator wraps a change stream's tailable cursor.
*
* This iterator tracks the size of each batch in order to determine when the
* postBatchResumeToken is applicable. It also ensures that initial calls to
* rewind() do not execute getMore commands.
*
* @internal
*/
class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
{
private $batchPosition = 0;
private $batchSize;
private $isRewindNop;
private $postBatchResumeToken;
private $resumeToken;
/**
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param integer $firstBatchSize
* @param array|object|null $initialResumeToken
* @param object|null $postBatchResumeToken
*/
public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)
{
if ( ! is_integer($firstBatchSize)) {
throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer');
}
if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
}
if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) {
throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object');
}
parent::__construct($cursor);
$this->batchSize = $firstBatchSize;
$this->isRewindNop = ($firstBatchSize === 0);
$this->postBatchResumeToken = $postBatchResumeToken;
$this->resumeToken = $initialResumeToken;
}
/** @internal */
final public function commandFailed(CommandFailedEvent $event)
{
}
/** @internal */
final public function commandStarted(CommandStartedEvent $event)
{
if ($event->getCommandName() !== 'getMore') {
return;
}
$this->batchPosition = 0;
$this->batchSize = null;
$this->postBatchResumeToken = null;
}
/** @internal */
final public function commandSucceeded(CommandSucceededEvent $event)
{
if ($event->getCommandName() !== 'getMore') {
return;
}
$reply = $event->getReply();
if ( ! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
}
$this->batchSize = count($reply->cursor->nextBatch);
if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
$this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
}
}
/**
* Returns the resume token for the iterator's current position.
*
* Null may be returned if no change documents have been iterated and the
* server did not include a postBatchResumeToken in its aggregate or getMore
* command response.
*
* @return array|object|null
*/
public function getResumeToken()
{
return $this->resumeToken;
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function next()
{
/* Determine if advancing the iterator will execute a getMore command
* (i.e. we are already positioned at the end of the current batch). If
* so, rely on the APM callbacks to reset $batchPosition and update
* $batchSize. Otherwise, we can forgo APM and manually increment
* $batchPosition after calling next(). */
$getMore = $this->isAtEndOfBatch();
if ($getMore) {
\MongoDB\Driver\Monitoring\addSubscriber($this);
}
try {
parent::next();
$this->onIteration(!$getMore);
} finally {
if ($getMore) {
\MongoDB\Driver\Monitoring\removeSubscriber($this);
}
}
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function rewind()
{
if ($this->isRewindNop) {
return;
}
parent::rewind();
$this->onIteration(false);
}
/**
* Extracts the resume token (i.e. "_id" field) from a change document.
*
* @param array|object $document Change document
* @return array|object
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
*/
private function extractResumeToken($document)
{
if ( ! is_array($document) && ! is_object($document)) {
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
}
if ($document instanceof Serializable) {
return $this->extractResumeToken($document->bsonSerialize());
}
$resumeToken = is_array($document)
? (isset($document['_id']) ? $document['_id'] : null)
: (isset($document->_id) ? $document->_id : null);
if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
}
if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw ResumeTokenException::invalidType($resumeToken);
}
return $resumeToken;
}
/**
* Return whether the iterator is positioned at the end of the batch.
*
* @return boolean
*/
private function isAtEndOfBatch()
{
return ($this->batchPosition + 1 >= $this->batchSize);
}
/**
* Perform housekeeping after an iteration event.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
* @param boolean $incrementBatchPosition
*/
private function onIteration($incrementBatchPosition)
{
$isValid = $this->valid();
/* Disable rewind()'s NOP behavior once we advance to a valid position.
* This will allow the driver to throw a LogicException if rewind() is
* called after the cursor has advanced past its first element. */
if ($this->isRewindNop && $isValid) {
$this->isRewindNop = false;
}
if ($incrementBatchPosition && $isValid) {
$this->batchPosition++;
}
/* If the iterator is positioned at the end of the batch, apply the
* postBatchResumeToken if it's available. This handles both the case
* where the current batch is empty (since onIteration() will be called
* after a successful getMore) and when the iterator has advanced to the
* last document in its current batch. Otherwise, extract a resume token
* from the current document if possible. */
if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
$this->resumeToken = $this->postBatchResumeToken;
} elseif ($isValid) {
$this->resumeToken = $this->extractResumeToken($this->current());
}
}
}
<?php
/*
* Copyright 2019 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace MongoDB\Model;
use MongoDB\Driver\Cursor;
use IteratorIterator;
/**
* Iterator for tailable cursors.
*
* This iterator may be used to wrap a tailable cursor. By indicating whether
* the cursor's first batch of results is empty, this iterator can NOP initial
* calls to rewind() and prevent it from executing a getMore command.
*
* @internal
*/
class TailableCursorIterator extends IteratorIterator
{
private $isRewindNop;
/**
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param boolean $isFirstBatchEmpty
*/
public function __construct(Cursor $cursor, $isFirstBatchEmpty)
{
parent::__construct($cursor);
$this->isRewindNop = $isFirstBatchEmpty;
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function next()
{
try {
parent::next();
} finally {
/* If the cursor ever advances to a valid position, do not prevent
* future attempts to rewind the cursor. This will allow the driver
* to throw a LogicException if the cursor has been advanced past
* its first element. */
if ($this->valid()) {
$this->isRewindNop = false;
}
}
}
/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function rewind()
{
if ($this->isRewindNop) {
return;
}
parent::rewind();
}
}
...@@ -19,6 +19,7 @@ namespace MongoDB\Operation; ...@@ -19,6 +19,7 @@ namespace MongoDB\Operation;
use MongoDB\ChangeStream; use MongoDB\ChangeStream;
use MongoDB\BSON\TimestampInterface; use MongoDB\BSON\TimestampInterface;
use MongoDB\Model\ChangeStreamIterator;
use MongoDB\Driver\Command; use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager; use MongoDB\Driver\Manager;
...@@ -57,10 +58,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -57,10 +58,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
private $changeStreamOptions; private $changeStreamOptions;
private $collectionName; private $collectionName;
private $databaseName; private $databaseName;
private $isFirstBatchEmpty = false; private $firstBatchSize;
private $hasResumed = false;
private $manager;
private $operationTime; private $operationTime;
private $pipeline; private $pipeline;
private $resumeCallable; private $postBatchResumeToken;
/** /**
* Constructs an aggregate command for creating a change stream. * Constructs an aggregate command for creating a change stream.
...@@ -185,12 +188,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -185,12 +188,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
$this->changeStreamOptions['allChangesForCluster'] = true; $this->changeStreamOptions['allChangesForCluster'] = true;
} }
$this->manager = $manager;
$this->databaseName = (string) $databaseName; $this->databaseName = (string) $databaseName;
$this->collectionName = isset($collectionName) ? (string) $collectionName : null; $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
$this->pipeline = $pipeline; $this->pipeline = $pipeline;
$this->aggregate = $this->createAggregate(); $this->aggregate = $this->createAggregate();
$this->resumeCallable = $this->createResumeCallable($manager);
} }
/** @internal */ /** @internal */
...@@ -205,7 +208,8 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -205,7 +208,8 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
return; return;
} }
$this->isFirstBatchEmpty = false; $this->firstBatchSize = null;
$this->postBatchResumeToken = null;
} }
/** @internal */ /** @internal */
...@@ -217,14 +221,19 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -217,14 +221,19 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
$reply = $event->getReply(); $reply = $event->getReply();
/* Note: the spec only refers to collecting an operation time from the if ( ! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
* "original aggregation", so only capture it if we've not already. */ throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
if (!isset($this->operationTime) && isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) { }
$this->operationTime = $reply->operationTime;
$this->firstBatchSize = count($reply->cursor->firstBatch);
if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
$this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
} }
if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) { if ($this->shouldCaptureOperationTime($event->getServer()) &&
$this->isFirstBatchEmpty = empty($reply->cursor->firstBatch); isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
$this->operationTime = $reply->operationTime;
} }
} }
...@@ -239,13 +248,14 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -239,13 +248,14 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
*/ */
public function execute(Server $server) public function execute(Server $server)
{ {
$cursor = $this->executeAggregate($server); return new ChangeStream(
$this->createChangeStreamIterator($server),
return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchEmpty); function($resumeToken) { return $this->resume($resumeToken); }
);
} }
/** /**
* Create the aggregate command for creating a change stream. * Create the aggregate command for a change stream.
* *
* This method is also used to recreate the aggregate command when resuming. * This method is also used to recreate the aggregate command when resuming.
* *
...@@ -259,40 +269,27 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -259,40 +269,27 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions); return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
} }
private function createResumeCallable(Manager $manager) /**
* Create a ChangeStreamIterator by executing the aggregate command.
*
* @param Server $server
* @return ChangeStreamIterator
*/
private function createChangeStreamIterator(Server $server)
{ {
return function($resumeToken = null) use ($manager) { return new ChangeStreamIterator(
/* If a resume token was provided, update the "resumeAfter" option $this->executeAggregate($server),
* and ensure that "startAtOperationTime" is no longer set. */ $this->firstBatchSize,
if ($resumeToken !== null) { $this->getInitialResumeToken(),
$this->changeStreamOptions['resumeAfter'] = $resumeToken; $this->postBatchResumeToken
unset($this->changeStreamOptions['startAtOperationTime']); );
}
// Select a new server using the original read preference
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
/* If we captured an operation time from the first aggregate command
* and there is no "resumeAfter" option, set "startAtOperationTime"
* so that we can resume from the original aggregate's time. */
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}
// Recreate the aggregate command and execute to obtain a new cursor
$this->aggregate = $this->createAggregate();
$cursor = $this->executeAggregate($server);
return [$cursor, $this->isFirstBatchEmpty];
};
} }
/** /**
* Execute the aggregate command. * Execute the aggregate command.
* *
* The command will be executed using APM so that we can capture its * The command will be executed using APM so that we can capture data from
* operation time and/or firstBatch size. * its response (e.g. firstBatch size, postBatchResumeToken).
* *
* @param Server $server * @param Server $server
* @return Cursor * @return Cursor
...@@ -307,4 +304,98 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -307,4 +304,98 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
\MongoDB\Driver\Monitoring\removeSubscriber($this); \MongoDB\Driver\Monitoring\removeSubscriber($this);
} }
} }
/**
* Return the initial resume token for creating the ChangeStreamIterator.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
* @return array|object|null
*/
private function getInitialResumeToken()
{
if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
return $this->postBatchResumeToken;
}
if (isset($this->changeStreamOptions['startAfter'])) {
return $this->changeStreamOptions['startAfter'];
}
if (isset($this->changeStreamOptions['resumeAfter'])) {
return $this->changeStreamOptions['resumeAfter'];
}
return null;
}
/**
* Resumes a change stream.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
* @param array|object|null $resumeToken
* @return ChangeStreamIterator
* @throws InvalidArgumentException
*/
private function resume($resumeToken = null)
{
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
}
$this->hasResumed = true;
// Select a new server using the original read preference
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);
unset($this->changeStreamOptions['resumeAfter']);
unset($this->changeStreamOptions['startAfter']);
unset($this->changeStreamOptions['startAtOperationTime']);
if ($resumeToken !== null) {
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
}
if ($resumeToken === null && $this->operationTime !== null) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}
// Recreate the aggregate command and return a new ChangeStreamIterator
$this->aggregate = $this->createAggregate();
return $this->createChangeStreamIterator($server);
}
/**
* Determine whether to capture operation time from an aggregate response.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
* @param Server $server
* @return boolean
*/
private function shouldCaptureOperationTime(Server $server)
{
if ($this->hasResumed) {
return false;
}
if (isset($this->changeStreamOptions['resumeAfter']) ||
isset($this->changeStreamOptions['startAfter']) ||
isset($this->changeStreamOptions['startAtOperationTime'])) {
return false;
}
if ($this->firstBatchSize > 0) {
return false;
}
if ($this->postBatchResumeToken !== null) {
return false;
}
if ( ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
return false;
}
return true;
}
} }
...@@ -930,6 +930,7 @@ class DocumentationExamplesTest extends FunctionalTestCase ...@@ -930,6 +930,7 @@ class DocumentationExamplesTest extends FunctionalTestCase
$db = new Database($this->manager, $this->getDatabaseName()); $db = new Database($this->manager, $this->getDatabaseName());
$db->dropCollection('inventory'); $db->dropCollection('inventory');
$db->createCollection('inventory');
// Start Changestream Example 1 // Start Changestream Example 1
$changeStream = $db->inventory->watch(); $changeStream = $db->inventory->watch();
......
...@@ -11,15 +11,31 @@ use MongoDB\Driver\Server; ...@@ -11,15 +11,31 @@ use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteConcern;
use MongoDB\Driver\Exception\CommandException; use MongoDB\Driver\Exception\CommandException;
use MongoDB\Operation\CreateCollection; use MongoDB\Operation\CreateCollection;
use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\DropCollection; use MongoDB\Operation\DropCollection;
use InvalidArgumentException;
use stdClass; use stdClass;
use UnexpectedValueException; use UnexpectedValueException;
abstract class FunctionalTestCase extends TestCase abstract class FunctionalTestCase extends TestCase
{ {
protected $manager;
private $configuredFailPoints = [];
public function setUp() public function setUp()
{ {
parent::setUp();
$this->manager = new Manager(static::getUri()); $this->manager = new Manager(static::getUri());
$this->configuredFailPoints = [];
}
public function tearDown()
{
$this->disableFailPoints();
parent::tearDown();
} }
protected function assertCollectionCount($namespace, $count) protected function assertCollectionCount($namespace, $count)
...@@ -49,6 +65,39 @@ abstract class FunctionalTestCase extends TestCase ...@@ -49,6 +65,39 @@ abstract class FunctionalTestCase extends TestCase
$this->assertEquals((string) $expectedObjectId, (string) $actualObjectId); $this->assertEquals((string) $expectedObjectId, (string) $actualObjectId);
} }
/**
* Configure a fail point for the test.
*
* The fail point will automatically be disabled during tearDown() to avoid
* affecting a subsequent test.
*
* @param array|stdClass $command configureFailPoint command document
* @throws InvalidArgumentException if $command is not a configureFailPoint command
*/
protected function configureFailPoint($command)
{
if (is_array($command)) {
$command = (object) $command;
}
if ( ! $command instanceof stdClass) {
throw new InvalidArgumentException('$command is not an array or stdClass instance');
}
if (key($command) !== 'configureFailPoint') {
throw new InvalidArgumentException('$command is not a configureFailPoint command');
}
$operation = new DatabaseCommand('admin', $command);
$cursor = $operation->execute($this->getPrimaryServer());
$result = $cursor->toArray()[0];
$this->assertCommandSucceeded($result);
// Record the fail point so it can be disabled during tearDown()
$this->configuredFailPoints[] = $command->configureFailPoint;
}
/** /**
* Creates the test collection with the specified options. * Creates the test collection with the specified options.
* *
...@@ -260,4 +309,24 @@ abstract class FunctionalTestCase extends TestCase ...@@ -260,4 +309,24 @@ abstract class FunctionalTestCase extends TestCase
$this->markTestSkipped('Transactions require WiredTiger storage engine'); $this->markTestSkipped('Transactions require WiredTiger storage engine');
} }
} }
/**
* Disables any fail points that were configured earlier in the test.
*
* This tracks fail points set via configureFailPoint() and should be called
* during tearDown().
*/
private function disableFailPoints()
{
if (empty($this->configuredFailPoints)) {
return;
}
$server = $this->getPrimaryServer();
foreach ($this->configuredFailPoints as $failPoint) {
$operation = new DatabaseCommand('admin', ['configureFailPoint' => $failPoint, 'mode' => 'off']);
$operation->execute($server);
}
}
} }
...@@ -4,14 +4,15 @@ namespace MongoDB\Tests\Model; ...@@ -4,14 +4,15 @@ namespace MongoDB\Tests\Model;
use MongoDB\Collection; use MongoDB\Collection;
use MongoDB\Driver\Exception\LogicException; use MongoDB\Driver\Exception\LogicException;
use MongoDB\Model\TailableCursorIterator; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Model\ChangeStreamIterator;
use MongoDB\Operation\Find; use MongoDB\Operation\Find;
use MongoDB\Operation\CreateCollection; use MongoDB\Operation\CreateCollection;
use MongoDB\Operation\DropCollection; use MongoDB\Operation\DropCollection;
use MongoDB\Tests\CommandObserver; use MongoDB\Tests\CommandObserver;
use MongoDB\Tests\FunctionalTestCase; use MongoDB\Tests\FunctionalTestCase;
class TailableCursorIteratorTest extends FunctionalTestCase class ChangeStreamIteratorTest extends FunctionalTestCase
{ {
private $collection; private $collection;
...@@ -28,42 +29,111 @@ class TailableCursorIteratorTest extends FunctionalTestCase ...@@ -28,42 +29,111 @@ class TailableCursorIteratorTest extends FunctionalTestCase
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName()); $this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
} }
public function testFirstBatchIsEmpty() /**
* @dataProvider provideInvalidIntegerValues
*/
public function testFirstBatchArgumentTypeCheck($firstBatchSize)
{ {
$this->collection->insertOne(['x' => 1]); $this->expectException(InvalidArgumentException::class);
new ChangeStreamIterator($this->collection->find(), $firstBatchSize, null, null);
}
public function provideInvalidIntegerValues()
{
return $this->wrapValuesForDataProvider($this->getInvalidIntegerValues());
}
public function testInitialResumeToken()
{
$iterator = new ChangeStreamIterator($this->collection->find(), 0, null, null);
$this->assertNull($iterator->getResumeToken());
$iterator = new ChangeStreamIterator($this->collection->find(), 0, ['resumeToken' => 1], null);
$this->assertSameDocument(['resumeToken' => 1], $iterator->getResumeToken());
$iterator = new ChangeStreamIterator($this->collection->find(), 0, (object) ['resumeToken' => 2], null);
$this->assertSameDocument((object) ['resumeToken' => 2], $iterator->getResumeToken());
}
/**
* @dataProvider provideInvalidDocumentValues
*/
public function testInitialResumeTokenArgumentTypeCheck($initialResumeToken)
{
$this->expectException(InvalidArgumentException::class);
new ChangeStreamIterator($this->collection->find(), 0, $initialResumeToken, null);
}
/**
* @dataProvider provideInvalidObjectValues
*/
public function testPostBatchResumeTokenArgumentTypeCheck($postBatchResumeToken)
{
$this->expectException(InvalidArgumentException::class);
new ChangeStreamIterator($this->collection->find(), 0, null, $postBatchResumeToken);
}
public function provideInvalidObjectValues()
{
return $this->wrapValuesForDataProvider(array_merge($this->getInvalidDocumentValues(), [[]]));
}
public function testPostBatchResumeTokenIsReturnedForLastElementInFirstBatch()
{
$this->collection->insertOne(['_id' => ['resumeToken' => 1], 'x' => 1]);
$this->collection->insertOne(['_id' => ['resumeToken' => 2], 'x' => 2]);
$postBatchResumeToken = (object) ['resumeToken' => 'pb'];
$cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]);
$iterator = new ChangeStreamIterator($cursor, 2, null, $postBatchResumeToken);
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertTrue($iterator->valid());
$this->assertSameDocument(['resumeToken' => 1], $iterator->getResumeToken());
$this->assertSameDocument(['_id' => ['resumeToken' => 1], 'x' => 1], $iterator->current());
$iterator->next();
$this->assertTrue($iterator->valid());
$this->assertSameDocument($postBatchResumeToken, $iterator->getResumeToken());
$this->assertSameDocument(['_id' => ['resumeToken' => 2], 'x' => 2], $iterator->current());
}
public function testRewindIsNopWhenFirstBatchIsEmpty()
{
$this->collection->insertOne(['_id' => ['resumeToken' => 1], 'x' => 1]);
$cursor = $this->collection->find(['x' => ['$gt' => 1]], ['cursorType' => Find::TAILABLE]); $cursor = $this->collection->find(['x' => ['$gt' => 1]], ['cursorType' => Find::TAILABLE]);
$iterator = new TailableCursorIterator($cursor, true); $iterator = new ChangeStreamIterator($cursor, 0, null, null);
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); }); $this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertFalse($iterator->valid()); $this->assertFalse($iterator->valid());
$this->collection->insertOne(['x' => 2]); $this->collection->insertOne(['_id' => ['resumeToken' => 2], 'x' => 2]);
$iterator->next(); $iterator->next();
$this->assertTrue($iterator->valid()); $this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 2], $iterator->current()); $this->assertSameDocument(['_id' => ['resumeToken' => 2], 'x' => 2], $iterator->current());
$this->expectException(LogicException::class); $this->expectException(LogicException::class);
$iterator->rewind(); $iterator->rewind();
} }
public function testFirstBatchIsNotEmpty() public function testRewindAdvancesWhenFirstBatchIsNotEmpty()
{ {
$this->collection->insertOne(['x' => 1]); $this->collection->insertOne(['_id' => ['resumeToken' => 1], 'x' => 1]);
$cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]); $cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]);
$iterator = new TailableCursorIterator($cursor, false); $iterator = new ChangeStreamIterator($cursor, 1, null, null);
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); }); $this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertTrue($iterator->valid()); $this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 1], $iterator->current()); $this->assertSameDocument(['_id' => ['resumeToken' => 1], 'x' => 1], $iterator->current());
$this->collection->insertOne(['x' => 2]); $this->collection->insertOne(['_id' => ['resumeToken' => 2], 'x' => 2]);
$iterator->next(); $iterator->next();
$this->assertTrue($iterator->valid()); $this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 2], $iterator->current()); $this->assertSameDocument(['_id' => ['resumeToken' => 2], 'x' => 2], $iterator->current());
$this->expectException(LogicException::class); $this->expectException(LogicException::class);
$iterator->rewind(); $iterator->rewind();
......
This diff is collapsed.
<?php
namespace MongoDB\Tests\SpecTests;
use MongoDB\Collection;
use MongoDB\Driver\Exception\ServerException;
/**
* Change Streams spec prose tests.
*
* @see https://github.com/mongodb/specifications/tree/master/source/change-streams
*/
class ChangeStreamsProseTest extends FunctionalTestCase
{
private $collection;
public function setUp()
{
parent::setUp();
$this->skipIfChangeStreamIsNotSupported();
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$this->dropCollection();
}
public function tearDown()
{
if (!$this->hasFailed()) {
$this->dropCollection();
}
parent::tearDown();
}
/**
* ChangeStream will not attempt to resume after encountering error code
* 11601 (Interrupted), 136 (CappedPositionLost), or 237 (CursorKilled)
* while executing a getMore command.
*
* @dataProvider provideNonResumableErrorCodes
*/
public function testProseTest5($errorCode)
{
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
$this->markTestSkipped('failCommand is not supported');
}
$this->configureFailPoint([
'configureFailPoint' => 'failCommand',
'mode' => ['times' => 1],
'data' => ['failCommands' => ['getMore'], 'errorCode' => $errorCode],
]);
$this->createCollection();
$changeStream = $this->collection->watch();
$changeStream->rewind();
$this->expectException(ServerException::class);
$this->expectExceptionCode($errorCode);
$changeStream->next();
}
public function provideNonResumableErrorCodes()
{
return [
[136], // CappedPositionLost
[237], // CursorKilled
[11601], // Interrupted
];
}
}
...@@ -4,7 +4,6 @@ namespace MongoDB\Tests\SpecTests; ...@@ -4,7 +4,6 @@ namespace MongoDB\Tests\SpecTests;
use MongoDB\Client; use MongoDB\Client;
use MongoDB\Collection; use MongoDB\Collection;
use MongoDB\Database;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteConcern;
use MongoDB\Driver\Exception\BulkWriteException; use MongoDB\Driver\Exception\BulkWriteException;
...@@ -14,7 +13,6 @@ use MongoDB\Tests\FunctionalTestCase as BaseFunctionalTestCase; ...@@ -14,7 +13,6 @@ use MongoDB\Tests\FunctionalTestCase as BaseFunctionalTestCase;
use MongoDB\Tests\TestCase; use MongoDB\Tests\TestCase;
use PHPUnit\Framework\SkippedTest; use PHPUnit\Framework\SkippedTest;
use ArrayIterator; use ArrayIterator;
use InvalidArgumentException;
use IteratorIterator; use IteratorIterator;
use LogicException; use LogicException;
use MultipleIterator; use MultipleIterator;
...@@ -32,21 +30,18 @@ class FunctionalTestCase extends BaseFunctionalTestCase ...@@ -32,21 +30,18 @@ class FunctionalTestCase extends BaseFunctionalTestCase
const TOPOLOGY_REPLICASET = 'replicaset'; const TOPOLOGY_REPLICASET = 'replicaset';
const TOPOLOGY_SHARDED = 'sharded'; const TOPOLOGY_SHARDED = 'sharded';
private $configuredFailPoints = [];
private $context; private $context;
public function setUp() public function setUp()
{ {
parent::setUp(); parent::setUp();
$this->configuredFailPoints = [];
$this->context = null; $this->context = null;
} }
public function tearDown() public function tearDown()
{ {
$this->context = null; $this->context = null;
$this->disableFailPoints();
parent::tearDown(); parent::tearDown();
} }
...@@ -141,39 +136,6 @@ class FunctionalTestCase extends BaseFunctionalTestCase ...@@ -141,39 +136,6 @@ class FunctionalTestCase extends BaseFunctionalTestCase
$this->markTestSkipped(sprintf('Server version "%s" and topology "%s" do not meet test requirements: %s', $serverVersion, $topology, json_encode($runOn))); $this->markTestSkipped(sprintf('Server version "%s" and topology "%s" do not meet test requirements: %s', $serverVersion, $topology, json_encode($runOn)));
} }
/**
* Configure a fail point for the test.
*
* The fail point will automatically be disabled during tearDown() to avoid
* affecting a subsequent test.
*
* @param array|stdClass $command configureFailPoint command document
* @throws InvalidArgumentException if $command is not a configureFailPoint command
*/
protected function configureFailPoint($command)
{
if (is_array($command)) {
$command = (object) $command;
}
if ( ! $command instanceof stdClass) {
throw new InvalidArgumentException('$command is not an array or stdClass instance');
}
if (key($command) !== 'configureFailPoint') {
throw new InvalidArgumentException('$command is not a configureFailPoint command');
}
$database = new Database($this->manager, 'admin');
$cursor = $database->command($command);
$result = $cursor->toArray()[0];
$this->assertCommandSucceeded($result);
// Record the fail point so it can be disabled during tearDown()
$this->configuredFailPoints[] = $command->configureFailPoint;
}
/** /**
* Decode a JSON spec test. * Decode a JSON spec test.
* *
...@@ -255,21 +217,6 @@ class FunctionalTestCase extends BaseFunctionalTestCase ...@@ -255,21 +217,6 @@ class FunctionalTestCase extends BaseFunctionalTestCase
return new Collection($this->manager, $context->databaseName, $context->outcomeCollectionName); return new Collection($this->manager, $context->databaseName, $context->outcomeCollectionName);
} }
/**
* Disables any fail points that were configured earlier in the test.
*
* This tracks fail points set via configureFailPoint() and should be called
* during tearDown().
*/
private function disableFailPoints()
{
$database = new Database($this->manager, 'admin');
foreach ($this->configuredFailPoints as $failPoint) {
$database->command(['configureFailPoint' => $failPoint, 'mode' => 'off']);
}
}
/** /**
* Return the corresponding topology constants for the current topology. * Return the corresponding topology constants for the current topology.
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment