Commit 1dc5478f authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-366: Change streams test runner

Support placeholders in DocumentsMatchConstraint and allow CommandExpectations to ignore extra events.

Also removes abstract from base spec test class.

Spec tests synced with mongodb/specifications@a1c9d3c5a7b938f595bb2547c07a238da21263f3
parent fd53a1a2
<?php
namespace MongoDB\Tests\SpecTests;
use MongoDB\ChangeStream;
use MongoDB\Client;
use MongoDB\Driver\Exception\Exception;
use MongoDB\Model\BSONDocument;
use ArrayIterator;
use LogicException;
use MultipleIterator;
use stdClass;
/**
* Change Streams spec tests.
*
* @see https://github.com/mongodb/specifications/tree/master/source/change-streams
*/
class ChangeStreamsSpecTest extends FunctionalTestCase
{
/* These should all pass before the driver can be considered compatible with
* MongoDB 4.2. */
private static $incompleteTests = [
'change-streams: Test consecutive resume' => 'PHPLIB-442, PHPLIB-416',
];
/**
* Assert that the expected and actual command documents match.
*
* Note: this method may modify the $expected object.
*
* @param stdClass $expected Expected command document
* @param stdClass $actual Actual command document
*/
public static function assertCommandMatches(stdClass $expected, stdClass $actual)
{
static::assertDocumentsMatch($expected, $actual);
}
/**
* Assert that the expected and actual documents match.
*
* @param array $expectedDocuments Expected documents
* @param array $actualDocuments Actual documents
*/
public static function assertResult(array $expectedDocuments, array $actualDocuments)
{
static::assertCount(count($expectedDocuments), $actualDocuments);
$mi = new MultipleIterator(MultipleIterator::MIT_NEED_ANY);
$mi->attachIterator(new ArrayIterator($expectedDocuments));
$mi->attachIterator(new ArrayIterator($actualDocuments));
foreach ($mi as $documents) {
list($expectedDocument, $actualDocument) = $documents;
$constraint = new DocumentsMatchConstraint($expectedDocument, true, true, ['42']);
static::assertThat($actualDocument, $constraint);
}
}
/**
* Execute an individual test case from the specification.
*
* @dataProvider provideTests
* @param string $name Test name
* @param stdClass $test Individual "tests[]" document
* @param string $databaseName Name of database under test
* @param string $collectionName Name of collection under test
* @param string $database2Name Name of alternate database under test
* @param string $collection2Name Name of alternate collection under test
*/
public function testChangeStreams($name, stdClass $test, $databaseName = null, $collectionName = null, $database2Name = null, $collection2Name = null)
{
$this->setName($name);
if (isset(self::$incompleteTests[$name])) {
$this->markTestIncomplete(self::$incompleteTests[$name]);
}
$this->checkServerRequirements($this->createRunOn($test));
if (!isset($databaseName, $collectionName, $database2Name, $collection2Name)) {
$this->fail('Required database and collection names are unset');
}
$context = Context::fromChangeStreams($test, $databaseName, $collectionName);
$this->setContext($context);
$this->dropDatabasesAndCreateCollection($databaseName, $collectionName);
$this->dropDatabasesAndCreateCollection($database2Name, $collection2Name);
if (isset($test->failPoint)) {
$this->configureFailPoint($test->failPoint);
}
if (isset($test->expectations)) {
$commandExpectations = CommandExpectations::fromChangeStreams($test->expectations);
$commandExpectations->startMonitoring();
}
$errorExpectation = ErrorExpectation::fromChangeStreams($test->result);
$resultExpectation = ResultExpectation::fromChangeStreams($test->result, [$this, 'assertResult']);
$result = null;
$exception = null;
try {
$changeStream = $this->createChangeStream($test);
} catch (Exception $e) {
$exception = $e;
}
if (isset($commandExpectations)) {
$commandExpectations->stopMonitoring();
}
foreach ($test->operations as $operation) {
Operation::fromChangeStreams($operation)->assert($this, $context);
}
if (isset($commandExpectations)) {
$commandExpectations->startMonitoring();
}
/* If the change stream was successfully created (i.e. $exception is
* null), attempt to iterate up to the expected number of results. It's
* possible that some errors (e.g. projecting out _id) will only be
* thrown during iteration, so we must also try/catch here. */
try {
if (isset($changeStream)) {
$limit = isset($test->result->success) ? count($test->result->success) : 0;
$result = $this->iterateChangeStream($changeStream, $limit);
}
} catch (Exception $e) {
$this->assertNull($exception);
$exception = $e;
}
$errorExpectation->assert($this, $exception);
$resultExpectation->assert($this, $result);
if (isset($commandExpectations)) {
$commandExpectations->stopMonitoring();
$commandExpectations->assert($this, $context);
}
}
public function provideTests()
{
$testArgs = [];
foreach (glob(__DIR__ . '/change-streams/*.json') as $filename) {
$json = $this->decodeJson(file_get_contents($filename));
$group = basename($filename, '.json');
$databaseName = isset($json->database_name) ? $json->database_name : null;
$database2Name = isset($json->database2_name) ? $json->database2_name : null;
$collectionName = isset($json->collection_name) ? $json->collection_name : null;
$collection2Name = isset($json->collection2_name) ? $json->collection2_name : null;
foreach ($json->tests as $test) {
$name = $group . ': ' . $test->description;
$testArgs[] = [$name, $test, $databaseName, $collectionName, $database2Name, $collection2Name];
}
}
return $testArgs;
}
/**
* Create a change stream.
*
* @param stdClass $test
* @return ChangeStream
* @throws LogicException if the target is unsupported
*/
private function createChangeStream(stdClass $test)
{
$context = $this->getContext();
$pipeline = isset($test->changeStreamPipeline) ? $test->changeStreamPipeline : [];
$options = isset($test->changeStreamOptions) ? (array) $test->changeStreamOptions : [];
switch ($test->target) {
case 'client':
return $context->client->watch($pipeline, $options);
case 'database':
return $context->getDatabase()->watch($pipeline, $options);
case 'collection':
return $context->getCollection()->watch($pipeline, $options);
default:
throw new LogicException('Unsupported target: ' . $test->target);
}
}
/**
* Convert the server requirements to a standard "runOn" array used by other
* specifications.
*
* @param stdClass $test
* @return array
*/
private function createRunOn(stdClass $test)
{
$req = new stdClass;
/* Append ".99" as patch version, since command monitoring tests expect
* the minor version to be an inclusive upper bound. */
if (isset($test->maxServerVersion)) {
$req->maxServerVersion = $test->maxServerVersion;
}
if (isset($test->minServerVersion)) {
$req->minServerVersion = $test->minServerVersion;
}
if (isset($test->topology)) {
$req->topology = $test->topology;
}
return [$req];
}
/**
* Drop the database and create the collection.
*
* @param string $databaseName
* @param string $collectionName
*/
private function dropDatabasesAndCreateCollection($databaseName, $collectionName)
{
$context = $this->getContext();
$database = $context->client->selectDatabase($databaseName);
$database->drop($context->defaultWriteOptions);
$database->createCollection($collectionName, $context->defaultWriteOptions);
}
/**
* Iterate a change stream.
*
* @param ChangeStream $changeStream
* @return BSONDocument[]
*/
private function iterateChangeStream(ChangeStream $changeStream, $limit = 0)
{
$events = [];
for ($changeStream->rewind(); count($events) < $limit; $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
$this->assertInstanceOf(BSONDocument::class, $event);
$events[] = $event;
}
return $events;
}
}
...@@ -21,6 +21,7 @@ class CommandExpectations implements CommandSubscriber ...@@ -21,6 +21,7 @@ class CommandExpectations implements CommandSubscriber
private $ignoreCommandFailed = false; private $ignoreCommandFailed = false;
private $ignoreCommandStarted = false; private $ignoreCommandStarted = false;
private $ignoreCommandSucceeded = false; private $ignoreCommandSucceeded = false;
private $ignoreExtraEvents = false;
private function __construct(array $events) private function __construct(array $events)
{ {
...@@ -44,6 +45,20 @@ class CommandExpectations implements CommandSubscriber ...@@ -44,6 +45,20 @@ class CommandExpectations implements CommandSubscriber
} }
} }
public static function fromChangeStreams(array $expectedEvents)
{
$o = new self($expectedEvents);
$o->ignoreCommandFailed = true;
$o->ignoreCommandSucceeded = true;
/* Change Streams spec tests do not include getMore commands in the
* list of expected events, so ignore any observed events beyond the
* number that are expected. */
$o->ignoreExtraEvents = true;;
return $o;
}
public static function fromCommandMonitoring(array $expectedEvents) public static function fromCommandMonitoring(array $expectedEvents)
{ {
return new self($expectedEvents); return new self($expectedEvents);
...@@ -125,11 +140,15 @@ class CommandExpectations implements CommandSubscriber ...@@ -125,11 +140,15 @@ class CommandExpectations implements CommandSubscriber
*/ */
public function assert(FunctionalTestCase $test, Context $context) public function assert(FunctionalTestCase $test, Context $context)
{ {
$test->assertCount(count($this->expectedEvents), $this->actualEvents); $actualEvents = $this->ignoreExtraEvents
? array_slice($this->actualEvents, 0, count($this->expectedEvents))
: $this->actualEvents;
$test->assertCount(count($this->expectedEvents), $actualEvents);
$mi = new MultipleIterator(MultipleIterator::MIT_NEED_ANY); $mi = new MultipleIterator(MultipleIterator::MIT_NEED_ANY);
$mi->attachIterator(new ArrayIterator($this->expectedEvents)); $mi->attachIterator(new ArrayIterator($this->expectedEvents));
$mi->attachIterator(new ArrayIterator($this->actualEvents)); $mi->attachIterator(new ArrayIterator($actualEvents));
foreach ($mi as $events) { foreach ($mi as $events) {
list($expectedEventAndClass, $actualEvent) = $events; list($expectedEventAndClass, $actualEvent) = $events;
......
...@@ -35,6 +35,15 @@ final class Context ...@@ -35,6 +35,15 @@ final class Context
$this->outcomeCollectionName = $collectionName; $this->outcomeCollectionName = $collectionName;
} }
public static function fromChangeStreams(stdClass $test, $databaseName, $collectionName)
{
$o = new self($databaseName, $collectionName);
$o->client = new Client(FunctionalTestCase::getUri());
return $o;
}
public static function fromCommandMonitoring(stdClass $test, $databaseName, $collectionName) public static function fromCommandMonitoring(stdClass $test, $databaseName, $collectionName)
{ {
$o = new self($databaseName, $collectionName); $o = new self($databaseName, $collectionName);
...@@ -98,7 +107,7 @@ final class Context ...@@ -98,7 +107,7 @@ final class Context
public function getCollection(array $collectionOptions = []) public function getCollection(array $collectionOptions = [])
{ {
return $this->client->selectCollection( return $this->selectCollection(
$this->databaseName, $this->databaseName,
$this->collectionName, $this->collectionName,
$this->prepareOptions($collectionOptions) $this->prepareOptions($collectionOptions)
...@@ -107,10 +116,7 @@ final class Context ...@@ -107,10 +116,7 @@ final class Context
public function getDatabase(array $databaseOptions = []) public function getDatabase(array $databaseOptions = [])
{ {
return $this->client->selectDatabase( return $this->selectDatabase($this->databaseName, $databaseOptions);
$this->databaseName,
$this->prepareOptions($databaseOptions)
);
} }
/** /**
...@@ -221,6 +227,23 @@ final class Context ...@@ -221,6 +227,23 @@ final class Context
} }
} }
public function selectCollection($databaseName, $collectionName, array $collectionOptions = [])
{
return $this->client->selectCollection(
$databaseName,
$collectionName,
$this->prepareOptions($collectionOptions)
);
}
public function selectDatabase($databaseName, array $databaseOptions = [])
{
return $this->client->selectDatabase(
$databaseName,
$this->prepareOptions($databaseOptions)
);
}
private function prepareSessionOptions(array $options) private function prepareSessionOptions(array $options)
{ {
if (isset($options['defaultTransactionOptions'])) { if (isset($options['defaultTransactionOptions'])) {
......
...@@ -19,6 +19,7 @@ class DocumentsMatchConstraint extends Constraint ...@@ -19,6 +19,7 @@ class DocumentsMatchConstraint extends Constraint
{ {
private $ignoreExtraKeysInRoot = false; private $ignoreExtraKeysInRoot = false;
private $ignoreExtraKeysInEmbedded = false; private $ignoreExtraKeysInEmbedded = false;
private $placeholders = [];
/* TODO: This is not currently used, but was preserved from the design of /* TODO: This is not currently used, but was preserved from the design of
* TestCase::assertMatchesDocument(), which would sort keys and then compare * TestCase::assertMatchesDocument(), which would sort keys and then compare
* documents as JSON strings. If the TODO item in matches() is implemented * documents as JSON strings. If the TODO item in matches() is implemented
...@@ -33,14 +34,15 @@ class DocumentsMatchConstraint extends Constraint ...@@ -33,14 +34,15 @@ class DocumentsMatchConstraint extends Constraint
* @param array|object $value * @param array|object $value
* @param boolean $ignoreExtraKeysInRoot If true, ignore extra keys within the root document * @param boolean $ignoreExtraKeysInRoot If true, ignore extra keys within the root document
* @param boolean $ignoreExtraKeysInEmbedded If true, ignore extra keys within embedded documents * @param boolean $ignoreExtraKeysInEmbedded If true, ignore extra keys within embedded documents
* * @param array $placeholders Placeholders for any value
*/ */
public function __construct($value, $ignoreExtraKeysInRoot = false, $ignoreExtraKeysInEmbedded = false) public function __construct($value, $ignoreExtraKeysInRoot = false, $ignoreExtraKeysInEmbedded = false, array $placeholders = [])
{ {
parent::__construct(); parent::__construct();
$this->value = $this->prepareBSON($value, true, $this->sortKeys); $this->value = $this->prepareBSON($value, true, $this->sortKeys);
$this->ignoreExtraKeysInRoot = $ignoreExtraKeysInRoot; $this->ignoreExtraKeysInRoot = $ignoreExtraKeysInRoot;
$this->ignoreExtraKeysInEmbedded = $ignoreExtraKeysInEmbedded; $this->ignoreExtraKeysInEmbedded = $ignoreExtraKeysInEmbedded;
$this->placeholders = $placeholders;
} }
/** /**
...@@ -101,6 +103,10 @@ class DocumentsMatchConstraint extends Constraint ...@@ -101,6 +103,10 @@ class DocumentsMatchConstraint extends Constraint
throw new RuntimeException('$actual is missing key: ' . $key); throw new RuntimeException('$actual is missing key: ' . $key);
} }
if (in_array($expectedValue, $this->placeholders, true)) {
continue;
}
$actualValue = $actual[$key]; $actualValue = $actual[$key];
if (($expectedValue instanceof BSONArray && $actualValue instanceof BSONArray) || if (($expectedValue instanceof BSONArray && $actualValue instanceof BSONArray) ||
......
...@@ -46,6 +46,15 @@ class DocumentsMatchConstraintTest extends TestCase ...@@ -46,6 +46,15 @@ class DocumentsMatchConstraintTest extends TestCase
$this->assertResult(false, $c, [1, ['a' => 2]], 'Keys must have the correct value'); $this->assertResult(false, $c, [1, ['a' => 2]], 'Keys must have the correct value');
} }
public function testPlaceholders()
{
$c = new DocumentsMatchConstraint(['x' => '42', 'y' => 42, 'z' => ['a' => 24]], false, false, [24, 42]);
$this->assertResult(true, $c, ['x' => '42', 'y' => 'foo', 'z' => ['a' => 1]], 'Placeholders accept any value');
$this->assertResult(false, $c, ['x' => 42, 'y' => 'foo', 'z' => ['a' => 1]], 'Placeholder type must match');
$this->assertResult(true, $c, ['x' => '42', 'y' => 42, 'z' => ['a' => 24]], 'Exact match');
}
private function assertResult($expectedResult, DocumentsMatchConstraint $constraint, $value, $message) private function assertResult($expectedResult, DocumentsMatchConstraint $constraint, $value, $message)
{ {
$this->assertSame($expectedResult, $constraint->evaluate($value, '', true), $message); $this->assertSame($expectedResult, $constraint->evaluate($value, '', true), $message);
......
...@@ -25,6 +25,7 @@ final class ErrorExpectation ...@@ -25,6 +25,7 @@ final class ErrorExpectation
'OperationNotSupportedInTransaction' => 263, 'OperationNotSupportedInTransaction' => 263,
]; ];
private $code;
private $codeName; private $codeName;
private $isExpected = false; private $isExpected = false;
private $excludedLabels = []; private $excludedLabels = [];
...@@ -35,6 +36,26 @@ final class ErrorExpectation ...@@ -35,6 +36,26 @@ final class ErrorExpectation
{ {
} }
public static function fromChangeStreams(stdClass $result)
{
$o = new self;
if (isset($result->error->code)) {
$o->code = $result->error->code;
$o->isExpected = true;
}
if (isset($result->error->errorLabels)) {
if (!self::isArrayOfStrings($result->error->errorLabels)) {
throw InvalidArgumentException::invalidType('errorLabels', $result->error->errorLabels, 'string[]');
}
$o->includedLabels = $result->error->errorLabels;
$o->isExpected = true;
}
return $o;
}
public static function fromRetryableWrites(stdClass $outcome) public static function fromRetryableWrites(stdClass $outcome)
{ {
$o = new self; $o = new self;
...@@ -88,6 +109,11 @@ final class ErrorExpectation ...@@ -88,6 +109,11 @@ final class ErrorExpectation
return $o; return $o;
} }
public static function noError()
{
return new self();
}
/** /**
* Assert that the error expectation matches the actual outcome. * Assert that the error expectation matches the actual outcome.
* *
......
...@@ -26,7 +26,7 @@ use UnexpectedValueException; ...@@ -26,7 +26,7 @@ use UnexpectedValueException;
* *
* @see https://github.com/mongodb/specifications * @see https://github.com/mongodb/specifications
*/ */
abstract class FunctionalTestCase extends BaseFunctionalTestCase class FunctionalTestCase extends BaseFunctionalTestCase
{ {
const TOPOLOGY_SINGLE = 'single'; const TOPOLOGY_SINGLE = 'single';
const TOPOLOGY_REPLICASET = 'replicaset'; const TOPOLOGY_REPLICASET = 'replicaset';
...@@ -60,7 +60,10 @@ abstract class FunctionalTestCase extends BaseFunctionalTestCase ...@@ -60,7 +60,10 @@ abstract class FunctionalTestCase extends BaseFunctionalTestCase
* @param stdClass $expectedCommand Expected command document * @param stdClass $expectedCommand Expected command document
* @param stdClass $actualCommand Actual command document * @param stdClass $actualCommand Actual command document
*/ */
abstract public static function assertCommandMatches(stdClass $expected, stdClass $actual); public static function assertCommandMatches(stdClass $expected, stdClass $actual)
{
throw new LogicException(sprintf('%s does not assert CommandStartedEvents', get_called_class()));
}
/** /**
* Assert that the expected and actual command reply documents match. * Assert that the expected and actual command reply documents match.
...@@ -71,7 +74,10 @@ abstract class FunctionalTestCase extends BaseFunctionalTestCase ...@@ -71,7 +74,10 @@ abstract class FunctionalTestCase extends BaseFunctionalTestCase
* @param stdClass $expected Expected command reply document * @param stdClass $expected Expected command reply document
* @param stdClass $actual Actual command reply document * @param stdClass $actual Actual command reply document
*/ */
abstract public static function assertCommandReplyMatches(stdClass $expected, stdClass $actual); public static function assertCommandReplyMatches(stdClass $expected, stdClass $actual)
{
throw new LogicException(sprintf('%s does not assert CommandSucceededEvents', get_called_class()));
}
/** /**
* Asserts that two given documents match. * Asserts that two given documents match.
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
namespace MongoDB\Tests\SpecTests; namespace MongoDB\Tests\SpecTests;
use MongoDB\Collection; use MongoDB\Collection;
use MongoDB\Database;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Session; use MongoDB\Driver\Session;
use MongoDB\Driver\Exception\BulkWriteException; use MongoDB\Driver\Exception\BulkWriteException;
...@@ -19,6 +20,8 @@ final class Operation ...@@ -19,6 +20,8 @@ final class Operation
{ {
const OBJECT_COLLECTION = 'collection'; const OBJECT_COLLECTION = 'collection';
const OBJECT_DATABASE = 'database'; const OBJECT_DATABASE = 'database';
const OBJECT_SELECT_COLLECTION = 'selectCollection';
const OBJECT_SELECT_DATABASE = 'selectDatabase';
const OBJECT_SESSION0 = 'session0'; const OBJECT_SESSION0 = 'session0';
const OBJECT_SESSION1 = 'session1'; const OBJECT_SESSION1 = 'session1';
...@@ -26,7 +29,9 @@ final class Operation ...@@ -26,7 +29,9 @@ final class Operation
public $resultExpectation; public $resultExpectation;
private $arguments = []; private $arguments = [];
private $collectionName;
private $collectionOptions = []; private $collectionOptions = [];
private $databaseName;
private $databaseOptions = []; private $databaseOptions = [];
private $name; private $name;
private $object = self::OBJECT_COLLECTION; private $object = self::OBJECT_COLLECTION;
...@@ -44,6 +49,35 @@ final class Operation ...@@ -44,6 +49,35 @@ final class Operation
} }
} }
public static function fromChangeStreams(stdClass $operation)
{
$o = new self($operation);
// Expect all operations to succeed
$o->errorExpectation = ErrorExpectation::noError();
/* The Change Streams spec tests include a unique "rename" operation,
* which we should convert to a renameCollection command to be run
* against the admin database. */
if ($operation->name === 'rename') {
$o->object = self::OBJECT_SELECT_DATABASE;
$o->databaseName = 'admin';
$o->name = 'runCommand';
$o->arguments = ['command' => [
'renameCollection' => $operation->database . '.' . $operation->collection,
'to' => $operation->database . '.' . $operation->arguments->to,
]];
return $o;
}
$o->databaseName = $operation->database;
$o->collectionName = $operation->collection;
$o->object = self::OBJECT_SELECT_COLLECTION;
return $o;
}
public static function fromCommandMonitoring(stdClass $operation) public static function fromCommandMonitoring(stdClass $operation)
{ {
$o = new self($operation); $o = new self($operation);
...@@ -52,6 +86,9 @@ final class Operation ...@@ -52,6 +86,9 @@ final class Operation
$o->collectionOptions = (array) $operation->collectionOptions; $o->collectionOptions = (array) $operation->collectionOptions;
} }
/* We purposefully avoid setting a default error expectation, because
* some tests may trigger a write or command error. */
return $o; return $o;
} }
...@@ -134,16 +171,26 @@ final class Operation ...@@ -134,16 +171,26 @@ final class Operation
{ {
switch ($this->object) { switch ($this->object) {
case self::OBJECT_COLLECTION: case self::OBJECT_COLLECTION:
return $this->executeForCollection($context); $collection = $context->getCollection($this->collectionOptions);
return $this->executeForCollection($collection, $context);
case self::OBJECT_DATABASE: case self::OBJECT_DATABASE:
return $this->executeForDatabase($context); $database = $context->getDatabase($this->databaseOptions);
return $this->executeForDatabase($database, $context);
case self::OBJECT_SELECT_COLLECTION:
$collection = $context->selectCollection($this->databaseName, $this->collectionName, $this->collectionOptions);
return $this->executeForCollection($collection, $context);
case self::OBJECT_SELECT_DATABASE:
$database = $context->selectDatabase($this->databaseName, $this->databaseOptions);
return $this->executeForDatabase($database, $context);
case self::OBJECT_SESSION0: case self::OBJECT_SESSION0:
return $this->executeForSession($context); return $this->executeForSession($context->session0, $context);
case self::OBJECT_SESSION1: case self::OBJECT_SESSION1:
return $this->executeForSession($context); return $this->executeForSession($context->session1, $context);
default: default:
throw new LogicException('Unsupported object: ' . $this->object); throw new LogicException('Unsupported object: ' . $this->object);
...@@ -153,13 +200,13 @@ final class Operation ...@@ -153,13 +200,13 @@ final class Operation
/** /**
* Executes the collection operation and return its result. * Executes the collection operation and return its result.
* *
* @param Context $context Execution context * @param Collection $collection
* @param Context $context Execution context
* @return mixed * @return mixed
* @throws LogicException if the collection operation is unsupported * @throws LogicException if the collection operation is unsupported
*/ */
private function executeForCollection(Context $context) private function executeForCollection(Collection $collection, Context $context)
{ {
$collection = $context->getCollection($this->collectionOptions);
$args = $context->prepareOptions($this->arguments); $args = $context->prepareOptions($this->arguments);
$context->replaceArgumentSessionPlaceholder($args); $context->replaceArgumentSessionPlaceholder($args);
...@@ -207,6 +254,9 @@ final class Operation ...@@ -207,6 +254,9 @@ final class Operation
array_diff_key($args, ['fieldName' => 1, 'filter' => 1]) array_diff_key($args, ['fieldName' => 1, 'filter' => 1])
); );
case 'drop':
return $collection->drop($args);
case 'findOneAndReplace': case 'findOneAndReplace':
if (isset($args['returnDocument'])) { if (isset($args['returnDocument'])) {
$args['returnDocument'] = ('after' === strtolower($args['returnDocument'])) $args['returnDocument'] = ('after' === strtolower($args['returnDocument']))
...@@ -262,13 +312,13 @@ final class Operation ...@@ -262,13 +312,13 @@ final class Operation
/** /**
* Executes the database operation and return its result. * Executes the database operation and return its result.
* *
* @param Context $context Execution context * @param Database $database
* @param Context $context Execution context
* @return mixed * @return mixed
* @throws LogicException if the database operation is unsupported * @throws LogicException if the database operation is unsupported
*/ */
private function executeForDatabase(Context $context) private function executeForDatabase(Database $database, Context $context)
{ {
$database = $context->getDatabase($this->databaseOptions);
$args = $context->prepareOptions($this->arguments); $args = $context->prepareOptions($this->arguments);
$context->replaceArgumentSessionPlaceholder($args); $context->replaceArgumentSessionPlaceholder($args);
...@@ -287,14 +337,13 @@ final class Operation ...@@ -287,14 +337,13 @@ final class Operation
/** /**
* Executes the session operation and return its result. * Executes the session operation and return its result.
* *
* @param Session $session
* @param Context $context Execution context * @param Context $context Execution context
* @return mixed * @return mixed
* @throws LogicException if the session operation is unsupported * @throws LogicException if the session operation is unsupported
*/ */
private function executeForSession(Context $context) private function executeForSession(Session $session, Context $context)
{ {
$session = $context->{$this->object};
switch ($this->name) { switch ($this->name) {
case 'abortTransaction': case 'abortTransaction':
return $session->abortTransaction(); return $session->abortTransaction();
...@@ -362,6 +411,9 @@ final class Operation ...@@ -362,6 +411,9 @@ final class Operation
case 'deleteOne': case 'deleteOne':
return ResultExpectation::ASSERT_DELETE; return ResultExpectation::ASSERT_DELETE;
case 'drop':
return ResultExpectation::ASSERT_NOTHING;
case 'findOneAndDelete': case 'findOneAndDelete':
case 'findOneAndReplace': case 'findOneAndReplace':
case 'findOneAndUpdate': case 'findOneAndUpdate':
......
...@@ -29,9 +29,11 @@ final class ResultExpectation ...@@ -29,9 +29,11 @@ final class ResultExpectation
const ASSERT_SAME_DOCUMENTS = 8; const ASSERT_SAME_DOCUMENTS = 8;
const ASSERT_MATCHES_DOCUMENT = 9; const ASSERT_MATCHES_DOCUMENT = 9;
const ASSERT_NULL = 10; const ASSERT_NULL = 10;
const ASSERT_CALLABLE = 11;
private $assertionType = self::ASSERT_NOTHING; private $assertionType = self::ASSERT_NOTHING;
private $expectedValue; private $expectedValue;
private $assertionCallable;
private function __construct($assertionType, $expectedValue) private function __construct($assertionType, $expectedValue)
{ {
...@@ -57,6 +59,19 @@ final class ResultExpectation ...@@ -57,6 +59,19 @@ final class ResultExpectation
$this->expectedValue = $expectedValue; $this->expectedValue = $expectedValue;
} }
public static function fromChangeStreams(stdClass $result, callable $assertionCallable)
{
if (!property_exists($result, 'success')) {
return new self(self::ASSERT_NOTHING, null);
}
$o = new self(self::ASSERT_CALLABLE, $result->success);
$o->assertionCallable = $assertionCallable;
return $o;
}
public static function fromRetryableWrites(stdClass $outcome, $defaultAssertionType) public static function fromRetryableWrites(stdClass $outcome, $defaultAssertionType)
{ {
if (property_exists($outcome, 'result')) { if (property_exists($outcome, 'result')) {
...@@ -138,6 +153,10 @@ final class ResultExpectation ...@@ -138,6 +153,10 @@ final class ResultExpectation
} }
break; break;
case self::ASSERT_CALLABLE:
call_user_func($this->assertionCallable, $expected, $actual);
break;
case self::ASSERT_DELETE: case self::ASSERT_DELETE:
$test->assertInstanceOf(DeleteResult::class, $actual); $test->assertInstanceOf(DeleteResult::class, $actual);
......
...@@ -12,16 +12,6 @@ use stdClass; ...@@ -12,16 +12,6 @@ use stdClass;
*/ */
class RetryableWritesSpecTest extends FunctionalTestCase class RetryableWritesSpecTest extends FunctionalTestCase
{ {
public static function assertCommandMatches(stdClass $expected, stdClass $actual)
{
throw new LogicException('Retryable writes spec tests do not assert CommandStartedEvents');
}
public static function assertCommandReplyMatches(stdClass $expected, stdClass $actual)
{
throw new LogicException('Retryable writes spec tests do not assert CommandSucceededEvents');
}
/** /**
* Execute an individual test case from the specification. * Execute an individual test case from the specification.
* *
......
...@@ -109,11 +109,6 @@ class TransactionsSpecTest extends FunctionalTestCase ...@@ -109,11 +109,6 @@ class TransactionsSpecTest extends FunctionalTestCase
static::assertDocumentsMatch($expected, $actual); static::assertDocumentsMatch($expected, $actual);
} }
public static function assertCommandReplyMatches(stdClass $expected, stdClass $actual)
{
throw new LogicException('Transactions spec tests do not assert CommandSucceededEvents');
}
/** /**
* Execute an individual test case from the specification. * Execute an individual test case from the specification.
* *
......
.. role:: javascript(code)
:language: javascript
==============
Change Streams
==============
.. contents::
--------
Introduction
============
The YAML and JSON files in this directory are platform-independent tests that
drivers can use to prove their conformance to the Change Streams Spec.
Several prose tests, which are not easily expressed in YAML, are also presented
in this file. Those tests will need to be manually implemented by each driver.
Spec Test Format
================
Each YAML file has the following keys:
- ``database_name``: The default database
- ``collection_name``: The default collection
- ``database2_name``: Another database
- ``collection2_name``: Another collection
- ``tests``: An array of tests that are to be run independently of each other.
Each test will have some of the following fields:
- ``description``: The name of the test.
- ``minServerVersion``: The minimum server version to run this test against. If not present, assume there is no minimum server version.
- ``maxServerVersion``: Reserved for later use
- ``failPoint``(optional): The configureFailPoint command document to run to configure a fail point on the primary server.
- ``target``: The entity on which to run the change stream. Valid values are:
- ``collection``: Watch changes on collection ``database_name.collection_name``
- ``database``: Watch changes on database ``database_name``
- ``client``: Watch changes on entire clusters
- ``topology``: An array of server topologies against which to run the test.
Valid topologies are ``single``, ``replicaset``, and ``sharded``.
- ``changeStreamPipeline``: An array of additional aggregation pipeline stages to add to the change stream
- ``changeStreamOptions``: Additional options to add to the changeStream
- ``operations``: Array of documents, each describing an operation. Each document has the following fields:
- ``database``: Database against which to run the operation
- ``collection``: Collection against which to run the operation
- ``name``: Name of the command to run
- ``arguments`` (optional): Object of arguments for the command (ex: document to insert)
- ``expectations``: Optional list of command-started events in Extended JSON format
- ``result``: Document with ONE of the following fields:
- ``error``: Describes an error received during the test
- ``success``: An Extended JSON array of documents expected to be received from the changeStream
Spec Test Match Function
========================
The definition of MATCH or MATCHES in the Spec Test Runner is as follows:
- MATCH takes two values, ``expected`` and ``actual``
- Notation is "Assert [actual] MATCHES [expected]
- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value"
Pseudocode implementation of ``actual`` MATCHES ``expected``:
::
If expected is "42" or 42:
Assert that actual exists (is not null or undefined)
Else:
Assert that actual is of the same JSON type as expected
If expected is a JSON array:
For every idx/value in expected:
Assert that actual[idx] MATCHES value
Else if expected is a JSON object:
For every key/value in expected
Assert that actual[key] MATCHES value
Else:
Assert that expected equals actual
The expected values for ``result.success`` and ``expectations`` are written in Extended JSON. Drivers may adopt any of the following approaches to comparisons, as long as they are consistent:
- Convert ``actual`` to Extended JSON and compare to ``expected``
- Convert ``expected`` and ``actual`` to BSON, and compare them
- Convert ``expected`` and ``actual`` to native equivalents of JSON, and compare them
Spec Test Runner
================
Before running the tests
- Create a MongoClient ``globalClient``, and connect to the server
For each YAML file, for each element in ``tests``:
- If ``topology`` does not include the topology of the server instance(s), skip this test.
- Use ``globalClient`` to
- Drop the database ``database_name``
- Drop the database ``database2_name``
- Create the database ``database_name`` and the collection ``database_name.collection_name``
- Create the database ``database2_name`` and the collection ``database2_name.collection2_name``
- If the the ``failPoint`` field is present, configure the fail point on the primary server. See
`Server Fail Point <../../transactions/tests#server-fail-point>`_ in the
Transactions spec test documentation for more information.
- Create a new MongoClient ``client``
- Begin monitoring all APM events for ``client``. (If the driver uses global listeners, filter out all events that do not originate with ``client``). Filter out any "internal" commands (e.g. ``isMaster``)
- Using ``client``, create a changeStream ``changeStream`` against the specified ``target``. Use ``changeStreamPipeline`` and ``changeStreamOptions`` if they are non-empty
- Using ``globalClient``, run every operation in ``operations`` in serial against the server
- Wait until either:
- An error occurs
- All operations have been successful AND the changeStream has received as many changes as there are in ``result.success``
- Close ``changeStream``
- If there was an error:
- Assert that an error was expected for the test.
- Assert that the error MATCHES ``result.error``
- Else:
- Assert that no error was expected for the test
- Assert that the changes received from ``changeStream`` MATCH the results in ``result.success``
- If there are any ``expectations``
- For each (``expected``, ``idx``) in ``expectations``
- Assert that ``actual[idx]`` MATCHES ``expected``
- Close the MongoClient ``client``
After running all tests
- Close the MongoClient ``globalClient``
- Drop database ``database_name``
- Drop database ``database2_name``
Prose Tests
===========
The following tests have not yet been automated, but MUST still be tested
#. ``ChangeStream`` must continuously track the last seen ``resumeToken``
#. ``ChangeStream`` will throw an exception if the server response is missing the resume token (if wire version is < 8, this is a driver-side error; for 8+, this is a server-side error)
#. ``ChangeStream`` will automatically resume one time on a resumable error (including `not master`) with the initial pipeline and options, except for the addition/update of a ``resumeToken``.
#. ``ChangeStream`` will not attempt to resume on any error encountered while executing an ``aggregate`` command.
#. ``ChangeStream`` will not attempt to resume after encountering error code 11601 (Interrupted), 136 (CappedPositionLost), or 237 (CursorKilled) while executing a ``getMore`` command.
#. ``ChangeStream`` will perform server selection before attempting to resume, using initial ``readPreference``
#. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on the driver side.
#. The ``killCursors`` command sent during the "Resume Process" must not be allowed to throw an exception.
#. ``$changeStream`` stage for ``ChangeStream`` against a server ``>=4.0`` and ``<4.0.7`` that has not received any results yet MUST include a ``startAtOperationTime`` option when resuming a changestream.
#. ``ChangeStream`` will resume after a ``killCursors`` command is issued for its child cursor.
#. - For a ``ChangeStream`` under these conditions:
- Running against a server ``>=4.0.7``.
- The batch is empty or has been iterated to the last document.
- Expected result:
- ``getResumeToken`` must return the ``postBatchResumeToken`` from the current command response.
#. - For a ``ChangeStream`` under these conditions:
- Running against a server ``<4.0.7``.
- The batch is empty or has been iterated to the last document.
- Expected result:
- ``getResumeToken`` must return the ``_id`` of the last document returned if one exists.
- ``getResumeToken`` must return ``startAfter`` from the initial aggregate if the option was specified.
- ``getResumeToken`` must return ``resumeAfter`` from the initial aggregate if the option was specified.
- If neither the ``startAfter`` nor ``resumeAfter`` options were specified, the ``getResumeToken`` result must be empty.
#. - For a ``ChangeStream`` under these conditions:
- The batch is not empty.
- The batch has been iterated up to but not including the last element.
- Expected result:
- ``getResumeToken`` must return the ``_id`` of the previous document returned.
#. - For a ``ChangeStream`` under these conditions:
- The batch is not empty.
- The batch hasn’t been iterated at all.
- Only the initial ``aggregate`` command has been executed.
- Expected result:
- ``getResumeToken`` must return ``startAfter`` from the initial aggregate if the option was specified.
- ``getResumeToken`` must return ``resumeAfter`` from the initial aggregate if the option was specified.
- If neither the ``startAfter`` nor ``resumeAfter`` options were specified, the ``getResumeToken`` result must be empty.
#. - For a ``ChangeStream`` under these conditions:
- Running against a server ``>=4.0.7``.
- The batch is not empty.
- The batch hasn’t been iterated at all.
- The stream has iterated beyond a previous batch and a ``getMore`` command has just been executed.
- Expected result:
- ``getResumeToken`` must return the ``postBatchResumeToken`` from the previous command response.
#. - For a ``ChangeStream`` under these conditions:
- Running against a server ``<4.0.7``.
- The batch is not empty.
- The batch hasn’t been iterated at all.
- The stream has iterated beyond a previous batch and a ``getMore`` command has just been executed.
- Expected result:
- ``getResumeToken`` must return the ``_id`` of the previous document returned if one exists.
- ``getResumeToken`` must return ``startAfter`` from the initial aggregate if the option was specified.
- ``getResumeToken`` must return ``resumeAfter`` from the initial aggregate if the option was specified.
- If neither the ``startAfter`` nor ``resumeAfter`` options were specified, the ``getResumeToken`` result must be empty.
\ No newline at end of file
{
"collection_name": "test",
"database_name": "change-stream-tests",
"collection2_name": "test2",
"database2_name": "change-stream-tests-2",
"tests": [
{
"description": "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"single"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [],
"expectations": [],
"result": {
"error": {
"code": 40573
}
}
},
{
"description": "Change Stream should error when an invalid aggregation stage is passed in",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [
{
"$unsupported": "foo"
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
},
{
"$unsupported": "foo"
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"error": {
"code": 40324
}
}
},
{
"description": "Change Stream should error when _id is projected out",
"minServerVersion": "4.1.11",
"target": "collection",
"topology": [
"replicaset",
"sharded"
],
"changeStreamPipeline": [
{
"$project": {
"_id": 0
}
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"result": {
"error": {
"code": 280,
"errorLabels": [
"NonResumableChangeStreamError"
]
}
}
}
]
}
collection_name: &collection_name "test"
database_name: &database_name "change-stream-tests"
collection2_name: &collection2_name "test2"
database2_name: &database2_name "change-stream-tests-2"
tests:
-
description: The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error
minServerVersion: "3.6.0"
target: collection
topology:
- single
changeStreamPipeline: []
changeStreamOptions: {}
operations: []
expectations: []
result:
error:
code: 40573
-
description: Change Stream should error when an invalid aggregation stage is passed in
minServerVersion: "3.6.0"
target: collection
topology:
- replicaset
changeStreamPipeline:
-
$unsupported: foo
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
-
$unsupported: foo
command_name: aggregate
database_name: *database_name
result:
error:
code: 40324
-
description: Change Stream should error when _id is projected out
minServerVersion: "4.1.11"
target: collection
topology:
- replicaset
- sharded
changeStreamPipeline:
-
$project: { _id: 0 }
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
result:
error:
code: 280
errorLabels: [ "NonResumableChangeStreamError" ]
{
"collection_name": "test",
"database_name": "change-stream-tests",
"collection2_name": "test2",
"database2_name": "change-stream-tests-2",
"tests": [
{
"description": "$changeStream must be the first stage in a change stream pipeline sent to the server",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"_id": "42",
"documentKey": "42",
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
}
]
}
},
{
"description": "The server returns change stream responses in the specified server response format",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectations": [],
"result": {
"success": [
{
"_id": "42",
"documentKey": "42",
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
}
]
}
},
{
"description": "Executing a watch helper on a Collection results in notifications for changes to the specified collection",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test2",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests-2",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Change Stream should allow valid aggregate pipeline stages",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [
{
"$match": {
"fullDocument.z": 3
}
}
],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
},
{
"$match": {
"fullDocument.z": {
"$numberInt": "3"
}
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.",
"minServerVersion": "3.8.0",
"target": "database",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test2",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests-2",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": {
"$numberInt": "1"
},
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test2"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.",
"minServerVersion": "3.8.0",
"target": "client",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test2",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests-2",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"y": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"z": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": {
"$numberInt": "1"
},
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default",
"allChangesForCluster": true
}
}
]
},
"command_name": "aggregate",
"database_name": "admin"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test2"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests-2",
"coll": "test"
},
"fullDocument": {
"y": {
"$numberInt": "2"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"z": {
"$numberInt": "3"
}
}
}
]
}
},
{
"description": "Test insert, update, replace, and delete event types",
"minServerVersion": "3.6.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "updateOne",
"arguments": {
"filter": {
"x": 1
},
"update": {
"$set": {
"x": 2
}
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "replaceOne",
"arguments": {
"filter": {
"x": 2
},
"replacement": {
"x": 3
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "deleteOne",
"arguments": {
"filter": {
"x": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "update",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"updateDescription": {
"updatedFields": {
"x": {
"$numberInt": "2"
}
}
}
},
{
"operationType": "replace",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "3"
}
}
},
{
"operationType": "delete",
"ns": {
"db": "change-stream-tests",
"coll": "test"
}
}
]
}
},
{
"description": "Test rename and invalidate event types",
"minServerVersion": "4.0.1",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "rename",
"arguments": {
"to": "test2"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "rename",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"to": {
"db": "change-stream-tests",
"coll": "test2"
}
},
{
"operationType": "invalidate"
}
]
}
},
{
"description": "Test drop and invalidate event types",
"minServerVersion": "4.0.1",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "drop"
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "drop",
"ns": {
"db": "change-stream-tests",
"coll": "test"
}
},
{
"operationType": "invalidate"
}
]
}
},
{
"description": "Test consecutive resume",
"minServerVersion": "4.1.7",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {
"batchSize": 1
},
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
},
"data": {
"failCommands": [
"getMore"
],
"closeConnection": true
}
},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 2
}
}
},
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 3
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {
"batchSize": 1
},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default"
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": [
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "1"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "2"
}
}
},
{
"operationType": "insert",
"ns": {
"db": "change-stream-tests",
"coll": "test"
},
"fullDocument": {
"x": {
"$numberInt": "3"
}
}
}
]
}
}
]
}
collection_name: &collection_name "test"
database_name: &database_name "change-stream-tests"
collection2_name: &collection2_name "test2"
database2_name: &database2_name "change-stream-tests-2"
tests:
-
description: "$changeStream must be the first stage in a change stream pipeline sent to the server"
minServerVersion: "3.6.0"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
_id: "42"
documentKey: "42"
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "1"
-
description: The server returns change stream responses in the specified server response format
minServerVersion: "3.6.0"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
expectations: []
result:
success:
-
_id: "42"
documentKey: "42"
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "1"
-
description: Executing a watch helper on a Collection results in notifications for changes to the specified collection
minServerVersion: "3.6.0"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection2_name
name: insertOne
arguments:
document:
x: 1
-
database: *database2_name
collection: *collection_name
name: insertOne
arguments:
document:
y: 2
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
z:
$numberInt: "3"
-
description: Change Stream should allow valid aggregate pipeline stages
minServerVersion: "3.6.0"
target: collection
topology:
- replicaset
changeStreamPipeline:
-
$match:
"fullDocument.z": 3
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
y: 2
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
-
$match:
"fullDocument.z":
$numberInt: "3"
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
z:
$numberInt: "3"
-
description: Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.
minServerVersion: "3.8.0"
target: database
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection2_name
name: insertOne
arguments:
document:
x: 1
-
database: *database2_name
collection: *collection_name
name: insertOne
arguments:
document:
y: 2
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
expectations:
-
command_started_event:
command:
aggregate:
$numberInt: "1"
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: insert
ns:
db: *database_name
coll: *collection2_name
fullDocument:
x:
$numberInt: "1"
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
z:
$numberInt: "3"
-
description: Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.
minServerVersion: "3.8.0"
target: client
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection2_name
name: insertOne
arguments:
document:
x: 1
-
database: *database2_name
collection: *collection_name
name: insertOne
arguments:
document:
y: 2
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
z: 3
expectations:
-
command_started_event:
command:
aggregate:
$numberInt: "1"
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
allChangesForCluster: true
command_name: aggregate
database_name: admin
result:
success:
-
operationType: insert
ns:
db: *database_name
coll: *collection2_name
fullDocument:
x:
$numberInt: "1"
-
operationType: insert
ns:
db: *database2_name
coll: *collection_name
fullDocument:
y:
$numberInt: "2"
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
z:
$numberInt: "3"
-
description: Test insert, update, replace, and delete event types
minServerVersion: "3.6.0"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
-
database: *database_name
collection: *collection_name
name: updateOne
arguments:
filter:
x: 1
update:
$set:
x: 2
-
database: *database_name
collection: *collection_name
name: replaceOne
arguments:
filter:
x: 2
replacement:
x: 3
-
database: *database_name
collection: *collection_name
name: deleteOne
arguments:
filter:
x: 3
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "1"
-
operationType: update
ns:
db: *database_name
coll: *collection_name
updateDescription:
updatedFields:
x:
$numberInt: "2"
-
operationType: replace
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "3"
-
operationType: delete
ns:
db: *database_name
coll: *collection_name
-
description: Test rename and invalidate event types
minServerVersion: "4.0.1"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: rename
arguments:
to: *collection2_name
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: rename
ns:
db: *database_name
coll: *collection_name
to:
db: *database_name
coll: *collection2_name
-
operationType: invalidate
-
description: Test drop and invalidate event types
minServerVersion: "4.0.1"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: drop
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: drop
ns:
db: *database_name
coll: *collection_name
-
operationType: invalidate
# Test that resume logic works correctly even after consecutive retryable failures of a getMore command,
# with no intervening events. This is ensured by setting the batch size of the change stream to 1,
-
description: Test consecutive resume
minServerVersion: "4.1.7"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: { batchSize: 1}
failPoint:
configureFailPoint: failCommand
mode: {times: 2}
data:
failCommands: ["getMore"]
closeConnection: true
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 2
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 3
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {batchSize: 1}
pipeline:
-
$changeStream:
fullDocument: default
command_name: aggregate
database_name: *database_name
result:
success:
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "1"
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "2"
-
operationType: insert
ns:
db: *database_name
coll: *collection_name
fullDocument:
x:
$numberInt: "3"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment