Commit 9739904e authored by Jeremy Mikola's avatar Jeremy Mikola

Merge pull request #544

parents c0d35516 eb2cd59f
arg_name: option
name: readConcern
type: :php:`MongoDB\\Driver\\ReadConcern <class.mongodb-driver-readconcern>`
description: |
:manual:`Read concern </reference/read-concern>` to use for the operation.
Defaults to the client's read concern.
This is not supported for server versions prior to 3.2 and will result in an
exception at execution time if used.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: readPreference
type: :php:`MongoDB\\Driver\\ReadPreference <class.mongodb-driver-readpreference>`
description: |
:manual:`Read preference </reference/read-preference>` to use for the
operation. Defaults to the client's read preference.
interface: phpmethod
operation: ~
optional: true
---
source:
file: apiargs-common-option.yaml
ref: typeMap
---
arg_name: option
name: writeConcern
type: :php:`MongoDB\\Driver\\WriteConcern <class.mongodb-driver-writeconcern>`
description: |
:manual:`Write concern </reference/write-concern>` to use for the operation.
Defaults to the client's write concern.
interface: phpmethod
operation: ~
optional: true
...
...@@ -10,16 +10,10 @@ source: ...@@ -10,16 +10,10 @@ source:
post: | post: |
This will be used for the returned command result document. This will be used for the returned command result document.
--- ---
arg_name: option source:
name: writeConcern file: apiargs-MongoDBClient-common-option.yaml
type: :php:`MongoDB\\Driver\\WriteConcern <class.mongodb-driver-writeconcern>` ref: writeConcern
description: | post: |
:manual:`Write concern </reference/write-concern>` to use for the operation.
Defaults to the client's write concern.
This is not supported for server versions prior to 3.4 and will result in an This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used. exception at execution time if used.
interface: phpmethod
operation: ~
optional: true
... ...
---
source:
file: apiargs-method-watch-option.yaml
ref: batchSize
---
source:
file: apiargs-common-option.yaml
ref: collation
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
---
source:
file: apiargs-MongoDBClient-common-option.yaml
ref: readConcern
---
source:
file: apiargs-MongoDBClient-common-option.yaml
ref: readPreference
post: |
This is used for both the initial change stream aggregation and for
server selection during an automatic resume.
---
source:
file: apiargs-method-watch-option.yaml
ref: resumeAfter
---
source:
file: apiargs-common-option.yaml
ref: session
---
source:
file: apiargs-method-watch-option.yaml
ref: startAtOperationTime
---
source:
file: apiargs-MongoDBClient-common-option.yaml
ref: typeMap
...
source:
file: apiargs-method-watch-param.yaml
ref: $pipeline
---
source:
file: apiargs-method-watch-param.yaml
ref: $options
...
...@@ -24,27 +24,14 @@ interface: phpmethod ...@@ -24,27 +24,14 @@ interface: phpmethod
operation: ~ operation: ~
optional: true optional: true
--- ---
arg_name: option source:
name: collation file: apiargs-common-option.yaml
type: array|object ref: collation
description: | post: |
:manual:`Collation </reference/collation>` allows users to specify
language-specific rules for string comparison, such as rules for lettercase
and accent marks. When specifying collation, the ``locale`` field is
mandatory; all other collation fields are optional. For descriptions of the
fields, see :manual:`Collation Document
</reference/collation/#collation-document>`.
If the collation is unspecified but the collection has a default collation, If the collation is unspecified but the collection has a default collation,
the operation uses the collation specified for the collection. If no the operation uses the collation specified for the collection. If no
collation is specified for the collection or for the operation, MongoDB uses collation is specified for the collection or for the operation, MongoDB uses
the simple binary comparison used in prior versions for string comparisons. the simple binary comparison used in prior versions for string comparisons.
This option is available in MongoDB 3.4+ and will result in an exception at
execution time if specified for an older server version.
interface: phpmethod
operation: ~
optional: true
--- ---
arg_name: option arg_name: option
name: readConcern name: readConcern
......
--- ---
arg_name: option source:
name: batchSize file: apiargs-method-watch-option.yaml
type: integer ref: batchSize
description: |
Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster.
interface: phpmethod
operation: ~
optional: true
--- ---
source: source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-common-option.yaml
ref: collation ref: collation
--- ---
arg_name: option source:
name: fullDocument file: apiargs-method-watch-option.yaml
type: string ref: fullDocument
description: |
Allowed values are 'default' and 'updateLookup'. Defaults to 'default'.
When set to 'updateLookup', the change notification for partial updates will
include both a delta describing the changes to the document, as well as a
copy of the entire document that was changed from some time after the change
occurred. The following values are supported:
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_DEFAULT`` (*default*)
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
--- ---
arg_name: option source:
name: maxAwaitTimeMS file: apiargs-method-watch-option.yaml
type: integer ref: maxAwaitTimeMS
description: |
Positive integer denoting the time limit in milliseconds for the server to
block a getMore operation if no data is available.
interface: phpmethod
operation: ~
optional: true
--- ---
source: source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-MongoDBCollection-common-option.yaml
...@@ -54,23 +26,20 @@ post: | ...@@ -54,23 +26,20 @@ post: |
This is used for both the initial change stream aggregation and for This is used for both the initial change stream aggregation and for
server selection during an automatic resume. server selection during an automatic resume.
--- ---
arg_name: option source:
name: resumeAfter file: apiargs-method-watch-option.yaml
type: array|object ref: resumeAfter
description: |
Specifies the logical starting point for the new change stream.
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
--- ---
source: source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
ref: session ref: session
--- ---
source:
file: apiargs-method-watch-option.yaml
ref: startAtOperationTime
post: |
.. versionadded:: 1.4
---
source: source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-MongoDBCollection-common-option.yaml
ref: typeMap ref: typeMap
......
arg_name: param source:
name: $pipeline file: apiargs-method-watch-param.yaml
type: array|object ref: $pipeline
description: |
The pipeline of stages to append to an initial ``$changeStream`` stage.
interface: phpmethod
operation: ~
optional: true
--- ---
source: source:
file: apiargs-common-param.yaml file: apiargs-method-watch-param.yaml
ref: $options ref: $options
... ...
arg_name: option
name: readConcern
type: :php:`MongoDB\\Driver\\ReadConcern <class.mongodb-driver-readconcern>`
description: |
:manual:`Read concern </reference/read-concern>` to use for the operation.
Defaults to the database's read concern.
This is not supported for server versions prior to 3.2 and will result in an
exception at execution time if used.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: readPreference
type: :php:`MongoDB\\Driver\\ReadPreference <class.mongodb-driver-readpreference>`
description: |
:manual:`Read preference </reference/read-preference>` to use for the
operation. Defaults to the database's read preference.
interface: phpmethod
operation: ~
optional: true
---
source: source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
ref: typeMap ref: typeMap
......
...@@ -28,7 +28,7 @@ operation: ~ ...@@ -28,7 +28,7 @@ operation: ~
optional: true optional: true
--- ---
source: source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-common-option.yaml
ref: collation ref: collation
pre: | pre: |
Specifies the :manual:`collation Specifies the :manual:`collation
......
---
source:
file: apiargs-method-watch-option.yaml
ref: batchSize
---
source:
file: apiargs-common-option.yaml
ref: collation
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
---
source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: readConcern
---
source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: readPreference
post: |
This is used for both the initial change stream aggregation and for
server selection during an automatic resume.
---
source:
file: apiargs-method-watch-option.yaml
ref: resumeAfter
---
source:
file: apiargs-common-option.yaml
ref: session
---
source:
file: apiargs-method-watch-option.yaml
ref: startAtOperationTime
---
source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: typeMap
...
source:
file: apiargs-method-watch-param.yaml
ref: $pipeline
---
source:
file: apiargs-method-watch-param.yaml
ref: $options
...
arg_name: option arg_name: option
name: collation
type: array|object
description: |
:manual:`Collation </reference/collation>` allows users to specify
language-specific rules for string comparison, such as rules for lettercase
and accent marks. When specifying collation, the ``locale`` field is
mandatory; all other collation fields are optional. For descriptions of the
fields, see :manual:`Collation Document
</reference/collation/#collation-document>`.
This option is available in MongoDB 3.4+ and will result in an exception at
execution time if specified for an older server version.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: maxTimeMS name: maxTimeMS
type: integer type: integer
description: | description: |
......
---
arg_name: option
name: batchSize
type: integer
description: |
Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: fullDocument
type: string
description: |
Allowed values are 'default' and 'updateLookup'. Defaults to 'default'.
When set to 'updateLookup', the change notification for partial updates will
include both a delta describing the changes to the document, as well as a
copy of the entire document that was changed from some time after the change
occurred. The following values are supported:
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_DEFAULT`` (*default*)
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: maxAwaitTimeMS
type: integer
description: |
Positive integer denoting the time limit in milliseconds for the server to
block a getMore operation if no data is available.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: resumeAfter
type: array|object
description: |
Specifies the logical starting point for the new change stream. The ``_id``
field in documents returned by the change stream may be used here.
Using this option in conjunction with ``startAtOperationTime`` will result in
a server error. The options are mutually exclusive.
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: startAtOperationTime
type: :php:`MongoDB\\BSON\\TimestampInterface <class.mongodb-bson-timestampinterface>`
description: |
If specified, the change stream will only provide changes that occurred at or
after the specified timestamp. Command responses from a MongoDB 4.0+ server
include an ``operationTime`` that can be used here. By default, the
``operationTime`` returned by the initial ``aggregate`` command will be used
if available.
Using this option in conjunction with ``resumeAfter`` will result in a server
error. The options are mutually exclusive.
This is not supported for server versions prior to 4.0 and will result in an
exception at execution time if used.
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
...
arg_name: param
name: $pipeline
type: array|object
description: |
The pipeline of stages to append to an initial ``$changeStream`` stage.
interface: phpmethod
operation: ~
optional: true
---
source:
file: apiargs-common-param.yaml
ref: $options
...
...@@ -40,3 +40,4 @@ Methods ...@@ -40,3 +40,4 @@ Methods
/reference/method/MongoDBClient-selectCollection /reference/method/MongoDBClient-selectCollection
/reference/method/MongoDBClient-selectDatabase /reference/method/MongoDBClient-selectDatabase
/reference/method/MongoDBClient-startSession /reference/method/MongoDBClient-startSession
/reference/method/MongoDBClient-watch
...@@ -59,5 +59,6 @@ Methods ...@@ -59,5 +59,6 @@ Methods
/reference/method/MongoDBDatabase-modifyCollection /reference/method/MongoDBDatabase-modifyCollection
/reference/method/MongoDBDatabase-selectCollection /reference/method/MongoDBDatabase-selectCollection
/reference/method/MongoDBDatabase-selectGridFSBucket /reference/method/MongoDBDatabase-selectGridFSBucket
/reference/method/MongoDBDatabase-watch
/reference/method/MongoDBDatabase-withOptions /reference/method/MongoDBDatabase-withOptions
========================
MongoDB\\Client::watch()
========================
.. versionadded:: 1.4
.. default-domain:: mongodb
.. contents:: On this page
:local:
:backlinks: none
:depth: 1
:class: singlecol
Definition
----------
.. phpmethod:: MongoDB\\Client::watch()
Executes a :manual:`change stream </changeStreams>` operation on the client.
The change stream can be watched for cluster-level changes.
.. code-block:: php
function watch(array $pipeline = [], array $options = []): MongoDB\ChangeStream
This method has the following parameters:
.. include:: /includes/apiargs/MongoDBClient-method-watch-param.rst
The ``$options`` parameter supports the following options:
.. include:: /includes/apiargs/MongoDBClient-method-watch-option.rst
Return Values
-------------
A :phpclass:`MongoDB\\ChangeStream` object, which allows for iteration of
events in the change stream via the :php:`Iterator <class.iterator>` interface.
Errors/Exceptions
-----------------
.. include:: /includes/extracts/error-unexpectedvalueexception.rst
.. include:: /includes/extracts/error-unsupportedexception.rst
.. include:: /includes/extracts/error-invalidargumentexception.rst
.. include:: /includes/extracts/error-driver-runtimeexception.rst
Examples
--------
This example reports events while iterating a change stream.
.. code-block:: php
<?php
$uri = 'mongodb://rs1.example.com,rs2.example.com/?replicaSet=myReplicaSet';
$client = new MongoDB\Client($uri);
$changeStream = $client->watch();
for ($changeStream->rewind(); true; $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
if ($event['operationType'] === 'invalidate') {
break;
}
$ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']);
$id = json_encode($event['documentKey']['_id']);
switch ($event['operationType']) {
case 'delete':
printf("Deleted document in %s with _id: %s\n\n", $ns, $id);
break;
case 'insert':
printf("Inserted new document in %s\n", $ns);
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'replace':
printf("Replaced new document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'update':
printf("Updated document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['updateDescription']), "\n\n";
break;
}
}
Assuming that a document was inserted, updated, and deleted while the above
script was iterating the change stream, the output would then resemble:
.. code-block:: none
Inserted new document in app.user
{"_id":{"$oid":"5b329b6674083047cc05e607"},"username":"bob"}
Inserted new document in app.products
{"_id":{"$oid":"5b329b6a74083047cc05e608"},"name":"Widget","quantity":5}
Inserted new document in logs.messages
{"_id":{"$oid":"5b329b7374083047cc05e609"},"msg":"bob purchased a widget"}
See Also
--------
- :manual:`Aggregation Pipeline </core/aggregation-pipeline>` documentation in
the MongoDB Manual
- :manual:`Change Streams </changeStreams>` documentation in the MongoDB manual
- :manual:`Change Events </reference/change-events/>` documentation in the
MongoDB manual
...@@ -18,7 +18,7 @@ Definition ...@@ -18,7 +18,7 @@ Definition
.. phpmethod:: MongoDB\\Collection::watch() .. phpmethod:: MongoDB\\Collection::watch()
Executes a :manual:`change stream </changeStreams>` operation on the Executes a :manual:`change stream </changeStreams>` operation on the
collection. collection. The change stream can be watched for collection-level changes.
.. code-block:: php .. code-block:: php
...@@ -68,6 +68,10 @@ This example reports events while iterating a change stream. ...@@ -68,6 +68,10 @@ This example reports events while iterating a change stream.
$event = $changeStream->current(); $event = $changeStream->current();
if ($event['operationType'] === 'invalidate') {
break;
}
$ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']); $ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']);
$id = json_encode($event['documentKey']['_id']); $id = json_encode($event['documentKey']['_id']);
...@@ -98,13 +102,14 @@ script was iterating the change stream, the output would then resemble: ...@@ -98,13 +102,14 @@ script was iterating the change stream, the output would then resemble:
.. code-block:: none .. code-block:: none
Inserted new document in test.inventory Inserted new document in test.user
{"_id":{"$oid":"5a81fc0d6118fd1af1790d32"},"name":"Widget","quantity":5} {"_id":{"$oid":"5b329c4874083047cc05e60a"},"username":"bob"}
Updated document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"} Inserted new document in test.products
{"updatedFields":{"quantity":4},"removedFields":[]} {"_id":{"$oid":"5b329c4d74083047cc05e60b"},"name":"Widget","quantity":5}
Deleted document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"} Updated document in test.user with _id: {"$oid":"5b329a4f74083047cc05e603"}
{"updatedFields":{"username":"robert"},"removedFields":[]}
See Also See Also
-------- --------
......
==========================
MongoDB\\Database::watch()
==========================
.. versionadded:: 1.4
.. default-domain:: mongodb
.. contents:: On this page
:local:
:backlinks: none
:depth: 1
:class: singlecol
Definition
----------
.. phpmethod:: MongoDB\\Database::watch()
Executes a :manual:`change stream </changeStreams>` operation on the
database. The change stream can be watched for database-level changes.
.. code-block:: php
function watch(array $pipeline = [], array $options = []): MongoDB\ChangeStream
This method has the following parameters:
.. include:: /includes/apiargs/MongoDBDatabase-method-watch-param.rst
The ``$options`` parameter supports the following options:
.. include:: /includes/apiargs/MongoDBDatabase-method-watch-option.rst
Return Values
-------------
A :phpclass:`MongoDB\\ChangeStream` object, which allows for iteration of
events in the change stream via the :php:`Iterator <class.iterator>` interface.
Errors/Exceptions
-----------------
.. include:: /includes/extracts/error-unexpectedvalueexception.rst
.. include:: /includes/extracts/error-unsupportedexception.rst
.. include:: /includes/extracts/error-invalidargumentexception.rst
.. include:: /includes/extracts/error-driver-runtimeexception.rst
Examples
--------
This example reports events while iterating a change stream.
.. code-block:: php
<?php
$uri = 'mongodb://rs1.example.com,rs2.example.com/?replicaSet=myReplicaSet';
$database = (new MongoDB\Client($uri))->test;
$changeStream = $database->watch();
for ($changeStream->rewind(); true; $changeStream->next()) {
if ( ! $changeStream->valid()) {
continue;
}
$event = $changeStream->current();
if ($event['operationType'] === 'invalidate') {
break;
}
$ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']);
$id = json_encode($event['documentKey']['_id']);
switch ($event['operationType']) {
case 'delete':
printf("Deleted document in %s with _id: %s\n\n", $ns, $id);
break;
case 'insert':
printf("Inserted new document in %s\n", $ns);
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'replace':
printf("Replaced new document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'update':
printf("Updated document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['updateDescription']), "\n\n";
break;
}
}
Assuming that a document was inserted, updated, and deleted while the above
script was iterating the change stream, the output would then resemble:
.. code-block:: none
Inserted new document in test.inventory
{"_id":{"$oid":"5a81fc0d6118fd1af1790d32"},"name":"Widget","quantity":5}
Updated document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"}
{"updatedFields":{"quantity":4},"removedFields":[]}
Deleted document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"}
See Also
--------
- :manual:`Aggregation Pipeline </core/aggregation-pipeline>` documentation in
the MongoDB Manual
- :manual:`Change Streams </changeStreams>` documentation in the MongoDB manual
- :manual:`Change Events </reference/change-events/>` documentation in the
MongoDB manual
...@@ -19,8 +19,9 @@ namespace MongoDB; ...@@ -19,8 +19,9 @@ namespace MongoDB;
use MongoDB\BSON\Serializable; use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\ConnectionTimeoutException; use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\RuntimeException; use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException; use MongoDB\Exception\ResumeTokenException;
use IteratorIterator; use IteratorIterator;
...@@ -35,14 +36,22 @@ use Iterator; ...@@ -35,14 +36,22 @@ use Iterator;
*/ */
class ChangeStream implements Iterator class ChangeStream implements Iterator
{ {
/**
* @deprecated 1.4
* @todo Remove this in 2.0 (see: PHPLIB-360)
*/
const CURSOR_NOT_FOUND = 43;
private static $errorCodeCappedPositionLost = 136;
private static $errorCodeInterrupted = 11601;
private static $errorCodeCursorKilled = 237;
private $resumeToken; private $resumeToken;
private $resumeCallable; private $resumeCallable;
private $csIt; private $csIt;
private $key = 0; private $key = 0;
private $hasAdvanced = false; private $hasAdvanced = false;
const CURSOR_NOT_FOUND = 43;
/** /**
* Constructor. * Constructor.
* *
...@@ -91,7 +100,6 @@ class ChangeStream implements Iterator ...@@ -91,7 +100,6 @@ class ChangeStream implements Iterator
*/ */
public function next() public function next()
{ {
$resumable = false;
try { try {
$this->csIt->next(); $this->csIt->next();
if ($this->valid()) { if ($this->valid()) {
...@@ -111,20 +119,11 @@ class ChangeStream implements Iterator ...@@ -111,20 +119,11 @@ class ChangeStream implements Iterator
$this->resumeCallable = null; $this->resumeCallable = null;
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) { if ($this->isResumableError($e)) {
$resumable = true;
}
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
}
}
if ($resumable) {
$this->resume(); $this->resume();
} }
} }
}
/** /**
* @see http://php.net/iterator.rewind * @see http://php.net/iterator.rewind
...@@ -132,7 +131,6 @@ class ChangeStream implements Iterator ...@@ -132,7 +131,6 @@ class ChangeStream implements Iterator
*/ */
public function rewind() public function rewind()
{ {
$resumable = false;
try { try {
$this->csIt->rewind(); $this->csIt->rewind();
if ($this->valid()) { if ($this->valid()) {
...@@ -144,20 +142,11 @@ class ChangeStream implements Iterator ...@@ -144,20 +142,11 @@ class ChangeStream implements Iterator
$this->resumeCallable = null; $this->resumeCallable = null;
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) { if ($this->isResumableError($e)) {
$resumable = true;
}
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
}
}
if ($resumable) {
$this->resume(); $this->resume();
} }
} }
}
/** /**
* @see http://php.net/iterator.valid * @see http://php.net/iterator.valid
...@@ -201,6 +190,30 @@ class ChangeStream implements Iterator ...@@ -201,6 +190,30 @@ class ChangeStream implements Iterator
return $resumeToken; return $resumeToken;
} }
/**
* Determines if an exception is a resumable error.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
* @param RuntimeException $exception
* @return boolean
*/
private function isResumableError(RuntimeException $exception)
{
if ($exception instanceof ConnectionException) {
return true;
}
if ( ! $exception instanceof ServerException) {
return false;
}
if (in_array($exception->getCode(), [self::$errorCodeCappedPositionLost, self::$errorCodeCursorKilled, self::$errorCodeInterrupted])) {
return false;
}
return true;
}
/** /**
* Creates a new changeStream after a resumable server error. * Creates a new changeStream after a resumable server error.
* *
......
...@@ -29,6 +29,7 @@ use MongoDB\Exception\UnsupportedException; ...@@ -29,6 +29,7 @@ use MongoDB\Exception\UnsupportedException;
use MongoDB\Model\DatabaseInfoIterator; use MongoDB\Model\DatabaseInfoIterator;
use MongoDB\Operation\DropDatabase; use MongoDB\Operation\DropDatabase;
use MongoDB\Operation\ListDatabases; use MongoDB\Operation\ListDatabases;
use MongoDB\Operation\Watch;
class Client class Client
{ {
...@@ -37,9 +38,12 @@ class Client ...@@ -37,9 +38,12 @@ class Client
'document' => 'MongoDB\Model\BSONDocument', 'document' => 'MongoDB\Model\BSONDocument',
'root' => 'MongoDB\Model\BSONDocument', 'root' => 'MongoDB\Model\BSONDocument',
]; ];
private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5; private static $wireVersionForWritableCommandWriteConcern = 5;
private $manager; private $manager;
private $readConcern;
private $readPreference;
private $uri; private $uri;
private $typeMap; private $typeMap;
private $writeConcern; private $writeConcern;
...@@ -81,6 +85,8 @@ class Client ...@@ -81,6 +85,8 @@ class Client
unset($driverOptions['typeMap']); unset($driverOptions['typeMap']);
$this->manager = new Manager($uri, $uriOptions, $driverOptions); $this->manager = new Manager($uri, $uriOptions, $driverOptions);
$this->readConcern = $this->manager->getReadConcern();
$this->readPreference = $this->manager->getReadPreference();
$this->writeConcern = $this->manager->getWriteConcern(); $this->writeConcern = $this->manager->getWriteConcern();
} }
...@@ -173,7 +179,7 @@ class Client ...@@ -173,7 +179,7 @@ class Client
*/ */
public function getReadConcern() public function getReadConcern()
{ {
return $this->manager->getReadConcern(); return $this->readConcern;
} }
/** /**
...@@ -183,7 +189,7 @@ class Client ...@@ -183,7 +189,7 @@ class Client
*/ */
public function getReadPreference() public function getReadPreference()
{ {
return $this->manager->getReadPreference(); return $this->readPreference;
} }
/** /**
...@@ -268,4 +274,34 @@ class Client ...@@ -268,4 +274,34 @@ class Client
{ {
return $this->manager->startSession($options); return $this->manager->startSession($options);
} }
/**
* Create a change stream for watching changes to the cluster.
*
* @see Watch::__construct() for supported options
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @return ChangeStream
* @throws InvalidArgumentException for parameter/option parsing errors
*/
public function watch(array $pipeline = [], array $options = [])
{
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
if ( ! isset($options['readConcern']) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
}
if ( ! isset($options['typeMap'])) {
$options['typeMap'] = $this->typeMap;
}
$operation = new Watch($this->manager, null, null, $pipeline, $options);
return $operation->execute($server);
}
} }
...@@ -34,6 +34,7 @@ use MongoDB\Operation\DropCollection; ...@@ -34,6 +34,7 @@ use MongoDB\Operation\DropCollection;
use MongoDB\Operation\DropDatabase; use MongoDB\Operation\DropDatabase;
use MongoDB\Operation\ListCollections; use MongoDB\Operation\ListCollections;
use MongoDB\Operation\ModifyCollection; use MongoDB\Operation\ModifyCollection;
use MongoDB\Operation\Watch;
class Database class Database
{ {
...@@ -42,6 +43,7 @@ class Database ...@@ -42,6 +43,7 @@ class Database
'document' => 'MongoDB\Model\BSONDocument', 'document' => 'MongoDB\Model\BSONDocument',
'root' => 'MongoDB\Model\BSONDocument', 'root' => 'MongoDB\Model\BSONDocument',
]; ];
private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5; private static $wireVersionForWritableCommandWriteConcern = 5;
private $databaseName; private $databaseName;
...@@ -409,6 +411,36 @@ class Database ...@@ -409,6 +411,36 @@ class Database
return new Bucket($this->manager, $this->databaseName, $options); return new Bucket($this->manager, $this->databaseName, $options);
} }
/**
* Create a change stream for watching changes to the database.
*
* @see Watch::__construct() for supported options
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @return ChangeStream
* @throws InvalidArgumentException for parameter/option parsing errors
*/
public function watch(array $pipeline = [], array $options = [])
{
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
if ( ! isset($options['readConcern']) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
}
if ( ! isset($options['typeMap'])) {
$options['typeMap'] = $this->typeMap;
}
$operation = new Watch($this->manager, $this->databaseName, null, $pipeline, $options);
return $operation->execute($server);
}
/** /**
* Get a clone of this database with different options. * Get a clone of this database with different options.
* *
......
...@@ -116,8 +116,11 @@ class Aggregate implements Executable ...@@ -116,8 +116,11 @@ class Aggregate implements Executable
* This is not supported for server versions < 3.4 and will result in an * This is not supported for server versions < 3.4 and will result in an
* exception at execution time if used. * exception at execution time if used.
* *
* Note: Collection-agnostic commands (e.g. $currentOp) may be executed by
* specifying null for the collection name.
*
* @param string $databaseName Database name * @param string $databaseName Database name
* @param string $collectionName Collection name * @param string|null $collectionName Collection name
* @param array $pipeline List of pipeline operations * @param array $pipeline List of pipeline operations
* @param array $options Command options * @param array $options Command options
* @throws InvalidArgumentException for parameter/option parsing errors * @throws InvalidArgumentException for parameter/option parsing errors
...@@ -220,7 +223,7 @@ class Aggregate implements Executable ...@@ -220,7 +223,7 @@ class Aggregate implements Executable
} }
$this->databaseName = (string) $databaseName; $this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName; $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
$this->pipeline = $pipeline; $this->pipeline = $pipeline;
$this->options = $options; $this->options = $options;
} }
...@@ -289,7 +292,7 @@ class Aggregate implements Executable ...@@ -289,7 +292,7 @@ class Aggregate implements Executable
private function createCommand(Server $server) private function createCommand(Server $server)
{ {
$cmd = [ $cmd = [
'aggregate' => $this->collectionName, 'aggregate' => isset($this->collectionName) ? $this->collectionName : 1,
'pipeline' => $this->pipeline, 'pipeline' => $this->pipeline,
]; ];
$cmdOptions = []; $cmdOptions = [];
......
...@@ -18,13 +18,19 @@ ...@@ -18,13 +18,19 @@
namespace MongoDB\Operation; namespace MongoDB\Operation;
use MongoDB\ChangeStream; use MongoDB\ChangeStream;
use MongoDB\BSON\TimestampInterface;
use MongoDB\Driver\Command; use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager; use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadConcern; use MongoDB\Driver\ReadConcern;
use MongoDB\Driver\ReadPreference; use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\Session; use MongoDB\Driver\Session;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException; use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\UnexpectedValueException; use MongoDB\Exception\UnexpectedValueException;
use MongoDB\Exception\UnsupportedException; use MongoDB\Exception\UnsupportedException;
...@@ -32,20 +38,27 @@ use MongoDB\Exception\UnsupportedException; ...@@ -32,20 +38,27 @@ use MongoDB\Exception\UnsupportedException;
/** /**
* Operation for creating a change stream with the aggregate command. * Operation for creating a change stream with the aggregate command.
* *
* Note: the implementation of CommandSubscriber is an internal implementation
* detail and should not be considered part of the public API.
*
* @api * @api
* @see \MongoDB\Collection::watch() * @see \MongoDB\Collection::watch()
* @see https://docs.mongodb.com/manual/changeStreams/ * @see https://docs.mongodb.com/manual/changeStreams/
*/ */
class Watch implements Executable class Watch implements Executable, /* @internal */ CommandSubscriber
{ {
private static $wireVersionForOperationTime = 7;
const FULL_DOCUMENT_DEFAULT = 'default'; const FULL_DOCUMENT_DEFAULT = 'default';
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'; const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
private $aggregate; private $aggregate;
private $databaseName; private $aggregateOptions;
private $changeStreamOptions;
private $collectionName; private $collectionName;
private $databaseName;
private $operationTime;
private $pipeline; private $pipeline;
private $options;
private $resumeCallable; private $resumeCallable;
/** /**
...@@ -79,22 +92,44 @@ class Watch implements Executable ...@@ -79,22 +92,44 @@ class Watch implements Executable
* * resumeAfter (document): Specifies the logical starting point for the * * resumeAfter (document): Specifies the logical starting point for the
* new change stream. * new change stream.
* *
* Using this option in conjunction with "startAtOperationTime" will
* result in a server error. The options are mutually exclusive.
*
* * session (MongoDB\Driver\Session): Client session. * * session (MongoDB\Driver\Session): Client session.
* *
* Sessions are not supported for server versions < 3.6. * Sessions are not supported for server versions < 3.6.
* *
* * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
* the change stream will only provide changes that occurred at or after
* the specified timestamp. Any command run against the server will
* return an operation time that can be used here. Alternatively, an
* operation time may be obtained from MongoDB\Driver\Server::getInfo().
*
* Using this option in conjunction with "resumeAfter" will result in a
* server error. The options are mutually exclusive.
*
* This option is not supported for server versions < 4.0.
*
* * typeMap (array): Type map for BSON deserialization. This will be * * typeMap (array): Type map for BSON deserialization. This will be
* applied to the returned Cursor (it is not sent to the server). * applied to the returned Cursor (it is not sent to the server).
* *
* @param string $databaseName Database name * Note: A database-level change stream may be created by specifying null
* @param string $collectionName Collection name * for the collection name. A cluster-level change stream may be created by
* specifying null for both the database and collection name.
*
* @param Manager $manager Manager instance from the driver
* @param string|null $databaseName Database name
* @param string|null $collectionName Collection name
* @param array $pipeline List of pipeline operations * @param array $pipeline List of pipeline operations
* @param array $options Command options * @param array $options Command options
* @param Manager $manager Manager instance from the driver
* @throws InvalidArgumentException for parameter/option parsing errors * @throws InvalidArgumentException for parameter/option parsing errors
*/ */
public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = []) public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
{ {
if (isset($collectionName) && ! isset($databaseName)) {
throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
}
$options += [ $options += [
'fullDocument' => self::FULL_DOCUMENT_DEFAULT, 'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY), 'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
...@@ -104,27 +139,67 @@ class Watch implements Executable ...@@ -104,27 +139,67 @@ class Watch implements Executable
throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string'); throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
} }
if (isset($options['resumeAfter'])) { if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
if ( ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object'); throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
} }
if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
} }
/* In the absence of an explicit session, create one to ensure that the
* initial aggregation and any resume attempts can use the same session
* ("implicit from the user's perspective" per PHPLIB-342). */
if ( ! isset($options['session'])) { if ( ! isset($options['session'])) {
try { try {
$options['session'] = $manager->startSession(); $options['session'] = $manager->startSession();
} catch (DriverRuntimeException $e) {} } catch (RuntimeException $e) {
/* We can ignore the exception, as libmongoc likely cannot
* create its own session and there is no risk of a mismatch. */
}
}
$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAtOperationTime' => 1]);
// Null database name implies a cluster-wide change stream
if ($databaseName === null) {
$databaseName = 'admin';
$this->changeStreamOptions['allChangesForCluster'] = true;
} }
$this->databaseName = (string) $databaseName; $this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName; $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
$this->pipeline = $pipeline; $this->pipeline = $pipeline;
$this->options = $options;
$this->aggregate = $this->createAggregate(); $this->aggregate = $this->createAggregate();
$this->resumeCallable = $this->createResumeCallable($manager); $this->resumeCallable = $this->createResumeCallable($manager);
} }
/** @internal */
final public function commandFailed(CommandFailedEvent $event)
{
}
/** @internal */
final public function commandStarted(CommandStartedEvent $event)
{
}
/** @internal */
final public function commandSucceeded(CommandSucceededEvent $event)
{
if ($event->getCommandName() !== 'aggregate') {
return;
}
$reply = $event->getReply();
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
$this->operationTime = $reply->operationTime;
}
}
/** /**
* Execute the operation. * Execute the operation.
* *
...@@ -132,51 +207,78 @@ class Watch implements Executable ...@@ -132,51 +207,78 @@ class Watch implements Executable
* @param Server $server * @param Server $server
* @return ChangeStream * @return ChangeStream
* @throws UnsupportedException if collation or read concern is used and unsupported * @throws UnsupportedException if collation or read concern is used and unsupported
* @throws DriverRuntimeException for other driver errors (e.g. connection errors) * @throws RuntimeException for other driver errors (e.g. connection errors)
*/ */
public function execute(Server $server) public function execute(Server $server)
{ {
$cursor = $this->aggregate->execute($server); return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
return new ChangeStream($cursor, $this->resumeCallable);
} }
/** /**
* Create the aggregate command for creating a change stream. * Create the aggregate command for creating a change stream.
* *
* This method is also used to recreate the aggregate command if a new * This method is also used to recreate the aggregate command when resuming.
* resume token is provided while resuming.
* *
* @return Aggregate * @return Aggregate
*/ */
private function createAggregate() private function createAggregate()
{ {
$changeStreamOptions = array_intersect_key($this->options, ['fullDocument' => 1, 'resumeAfter' => 1]);
$changeStream = ['$changeStream' => (object) $changeStreamOptions];
$pipeline = $this->pipeline; $pipeline = $this->pipeline;
array_unshift($pipeline, $changeStream); array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
$aggregateOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]); return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $aggregateOptions);
} }
private function createResumeCallable(Manager $manager) private function createResumeCallable(Manager $manager)
{ {
return function($resumeToken = null) use ($manager) { return function($resumeToken = null) use ($manager) {
/* If a resume token was provided, recreate the Aggregate operation /* If a resume token was provided, update the "resumeAfter" option
* using the new resume token. */ * and ensure that "startAtOperationTime" is no longer set. */
if ($resumeToken !== null) { if ($resumeToken !== null) {
$this->options['resumeAfter'] = $resumeToken; $this->changeStreamOptions['resumeAfter'] = $resumeToken;
$this->aggregate = $this->createAggregate(); unset($this->changeStreamOptions['startAtOperationTime']);
}
/* If we captured an operation time from the first aggregate command
* and there is no "resumeAfter" option, set "startAtOperationTime"
* so that we can resume from the original aggregate's time. */
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
} }
$this->aggregate = $this->createAggregate();
/* Select a new server using the read preference, execute this /* Select a new server using the read preference, execute this
* operation on it, and return the new ChangeStream. */ * operation on it, and return the new ChangeStream. */
$server = $manager->selectServer($this->options['readPreference']); $server = $manager->selectServer($this->aggregateOptions['readPreference']);
return $this->execute($server); return $this->execute($server);
}; };
} }
/**
* Execute the aggregate command and optionally capture its operation time.
*
* @param Server $server
* @return Cursor
*/
private function executeAggregate(Server $server)
{
/* If we've already captured an operation time or the server does not
* support returning an operation time (e.g. MongoDB 3.6), execute the
* aggregation directly and return its cursor. */
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForOperationTime)) {
return $this->aggregate->execute($server);
}
/* Otherwise, execute the aggregation using command monitoring so that
* we can capture its operation time with commandSucceeded(). */
\MongoDB\Driver\Monitoring\addSubscriber($this);
try {
return $this->aggregate->execute($server);
} finally {
\MongoDB\Driver\Monitoring\removeSubscriber($this);
}
}
} }
...@@ -123,7 +123,8 @@ class CollectionFunctionalTest extends FunctionalTestCase ...@@ -123,7 +123,8 @@ class CollectionFunctionalTest extends FunctionalTestCase
] ]
); );
}, },
function(stdClass $command) { function(array $event) {
$command = $event['started']->getCommand();
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $command);
$this->assertObjectHasAttribute('maxTimeMS', $command); $this->assertObjectHasAttribute('maxTimeMS', $command);
$this->assertObjectHasAttribute('writeConcern', $command); $this->assertObjectHasAttribute('writeConcern', $command);
......
...@@ -38,14 +38,16 @@ class CommandObserver implements CommandSubscriber ...@@ -38,14 +38,16 @@ class CommandObserver implements CommandSubscriber
public function commandStarted(CommandStartedEvent $event) public function commandStarted(CommandStartedEvent $event)
{ {
$this->commands[] = $event->getCommand(); $this->commands[$event->getRequestId()]['started'] = $event;
} }
public function commandSucceeded(CommandSucceededEvent $event) public function commandSucceeded(CommandSucceededEvent $event)
{ {
$this->commands[$event->getRequestId()]['succeeded'] = $event;
} }
public function commandFailed(CommandFailedEvent $event) public function commandFailed(CommandFailedEvent $event)
{ {
$this->commands[$event->getRequestId()]['failed'] = $event;
} }
} }
...@@ -979,7 +979,7 @@ class DocumentationExamplesTest extends FunctionalTestCase ...@@ -979,7 +979,7 @@ class DocumentationExamplesTest extends FunctionalTestCase
'documentKey' => ['_id' => 1], 'documentKey' => ['_id' => 1],
]; ];
$this->assertSameDocument($expectedChange, $lastChange); $this->assertMatchesDocument($expectedChange, $lastChange);
// Start Changestream Example 3 // Start Changestream Example 3
$resumeToken = ($lastChange !== null) ? $lastChange->_id : null; $resumeToken = ($lastChange !== null) ? $lastChange->_id : null;
...@@ -1002,7 +1002,7 @@ class DocumentationExamplesTest extends FunctionalTestCase ...@@ -1002,7 +1002,7 @@ class DocumentationExamplesTest extends FunctionalTestCase
'documentKey' => ['_id' => 2], 'documentKey' => ['_id' => 2],
]; ];
$this->assertSameDocument($expectedChange, $nextChange); $this->assertMatchesDocument($expectedChange, $nextChange);
// Start Changestream Example 4 // Start Changestream Example 4
$pipeline = [['$match' => ['$or' => [['fullDocument.username' => 'alice'], ['operationType' => 'delete']]]]]; $pipeline = [['$match' => ['$or' => [['fullDocument.username' => 'alice'], ['operationType' => 'delete']]]]];
......
...@@ -221,8 +221,8 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -221,8 +221,8 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
$stream->seek($offset); $stream->seek($offset);
$this->assertSame($expectedBytes, $stream->readBytes($length)); $this->assertSame($expectedBytes, $stream->readBytes($length));
}, },
function(stdClass $command) use (&$commands) { function(array $event) use (&$commands) {
$commands[] = key((array) $command); $commands[] = $event['started']->getCommandName();
} }
); );
...@@ -257,8 +257,8 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -257,8 +257,8 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
$stream->seek($offset); $stream->seek($offset);
$this->assertSame($expectedBytes, $stream->readBytes($length)); $this->assertSame($expectedBytes, $stream->readBytes($length));
}, },
function(stdClass $command) use (&$commands) { function(array $event) use (&$commands) {
$commands[] = key((array) $command); $commands[] = $event['started']->getCommandName();
} }
); );
...@@ -291,8 +291,8 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase ...@@ -291,8 +291,8 @@ class ReadableStreamFunctionalTest extends FunctionalTestCase
$stream->seek($offset); $stream->seek($offset);
$this->assertSame($expectedBytes, $stream->readBytes($length)); $this->assertSame($expectedBytes, $stream->readBytes($length));
}, },
function(stdClass $command) use (&$commands) { function(array $event) use (&$commands) {
$commands[] = key((array) $command); $commands[] = $event['started']->getCommandName();
} }
); );
......
...@@ -12,6 +12,28 @@ use stdClass; ...@@ -12,6 +12,28 @@ use stdClass;
class AggregateFunctionalTest extends FunctionalTestCase class AggregateFunctionalTest extends FunctionalTestCase
{ {
public function testCurrentOpCommand()
{
if (version_compare($this->getServerVersion(), '3.6.0', '<')) {
$this->markTestSkipped('$currentOp is not supported');
}
(new CommandObserver)->observe(
function() {
$operation = new Aggregate(
'admin',
null,
[['$currentOp' => (object) []]]
);
$operation->execute($this->getPrimaryServer());
},
function(array $event) {
$this->assertSame(1, $event['started']->getCommand()->aggregate);
}
);
}
public function testDefaultReadConcernIsOmitted() public function testDefaultReadConcernIsOmitted()
{ {
(new CommandObserver)->observe( (new CommandObserver)->observe(
...@@ -25,8 +47,8 @@ class AggregateFunctionalTest extends FunctionalTestCase ...@@ -25,8 +47,8 @@ class AggregateFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('readConcern', $command); $this->assertObjectNotHasAttribute('readConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -44,8 +66,8 @@ class AggregateFunctionalTest extends FunctionalTestCase ...@@ -44,8 +66,8 @@ class AggregateFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -90,8 +112,8 @@ class AggregateFunctionalTest extends FunctionalTestCase ...@@ -90,8 +112,8 @@ class AggregateFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
...@@ -163,8 +185,8 @@ class AggregateFunctionalTest extends FunctionalTestCase ...@@ -163,8 +185,8 @@ class AggregateFunctionalTest extends FunctionalTestCase
$this->assertCount(1, $results); $this->assertCount(1, $results);
$this->assertObjectHasAttribute('stages', current($results)); $this->assertObjectHasAttribute('stages', current($results));
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
......
...@@ -299,8 +299,8 @@ class BulkWriteFunctionalTest extends FunctionalTestCase ...@@ -299,8 +299,8 @@ class BulkWriteFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -23,8 +23,8 @@ class CountFunctionalTest extends FunctionalTestCase ...@@ -23,8 +23,8 @@ class CountFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('readConcern', $command); $this->assertObjectNotHasAttribute('readConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -87,8 +87,8 @@ class CountFunctionalTest extends FunctionalTestCase ...@@ -87,8 +87,8 @@ class CountFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -20,8 +20,8 @@ class CreateCollectionFunctionalTest extends FunctionalTestCase ...@@ -20,8 +20,8 @@ class CreateCollectionFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -42,8 +42,8 @@ class CreateCollectionFunctionalTest extends FunctionalTestCase ...@@ -42,8 +42,8 @@ class CreateCollectionFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -140,8 +140,8 @@ class CreateIndexesFunctionalTest extends FunctionalTestCase ...@@ -140,8 +140,8 @@ class CreateIndexesFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -163,8 +163,8 @@ class CreateIndexesFunctionalTest extends FunctionalTestCase ...@@ -163,8 +163,8 @@ class CreateIndexesFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -24,8 +24,8 @@ class DatabaseCommandFunctionalTest extends FunctionalTestCase ...@@ -24,8 +24,8 @@ class DatabaseCommandFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -79,8 +79,8 @@ class DeleteFunctionalTest extends FunctionalTestCase ...@@ -79,8 +79,8 @@ class DeleteFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -22,8 +22,8 @@ class DistinctFunctionalTest extends FunctionalTestCase ...@@ -22,8 +22,8 @@ class DistinctFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('readConcern', $command); $this->assertObjectNotHasAttribute('readConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -46,8 +46,8 @@ class DistinctFunctionalTest extends FunctionalTestCase ...@@ -46,8 +46,8 @@ class DistinctFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -22,8 +22,8 @@ class DropCollectionFunctionalTest extends FunctionalTestCase ...@@ -22,8 +22,8 @@ class DropCollectionFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -69,8 +69,8 @@ class DropCollectionFunctionalTest extends FunctionalTestCase ...@@ -69,8 +69,8 @@ class DropCollectionFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -22,8 +22,8 @@ class DropDatabaseFunctionalTest extends FunctionalTestCase ...@@ -22,8 +22,8 @@ class DropDatabaseFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -73,8 +73,8 @@ class DropDatabaseFunctionalTest extends FunctionalTestCase ...@@ -73,8 +73,8 @@ class DropDatabaseFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -28,8 +28,8 @@ class DropIndexesFunctionalTest extends FunctionalTestCase ...@@ -28,8 +28,8 @@ class DropIndexesFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -137,8 +137,8 @@ class DropIndexesFunctionalTest extends FunctionalTestCase ...@@ -137,8 +137,8 @@ class DropIndexesFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -182,7 +182,8 @@ class ExplainFunctionalTest extends FunctionalTestCase ...@@ -182,7 +182,8 @@ class ExplainFunctionalTest extends FunctionalTestCase
$explainOperation = new Explain($this->getDatabaseName(), $operation, ['typeMap' => ['root' => 'array', 'document' => 'array']]); $explainOperation = new Explain($this->getDatabaseName(), $operation, ['typeMap' => ['root' => 'array', 'document' => 'array']]);
$explainOperation->execute($this->getPrimaryServer()); $explainOperation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$command = $event['started']->getCommand();
$this->assertObjectNotHasAttribute('maxAwaitTimeMS', $command->explain); $this->assertObjectNotHasAttribute('maxAwaitTimeMS', $command->explain);
$this->assertObjectHasAttribute('tailable', $command->explain); $this->assertObjectHasAttribute('tailable', $command->explain);
$this->assertObjectHasAttribute('awaitData', $command->explain); $this->assertObjectHasAttribute('awaitData', $command->explain);
...@@ -206,7 +207,8 @@ class ExplainFunctionalTest extends FunctionalTestCase ...@@ -206,7 +207,8 @@ class ExplainFunctionalTest extends FunctionalTestCase
$explainOperation = new Explain($this->getDatabaseName(), $operation, ['typeMap' => ['root' => 'array', 'document' => 'array']]); $explainOperation = new Explain($this->getDatabaseName(), $operation, ['typeMap' => ['root' => 'array', 'document' => 'array']]);
$explainOperation->execute($this->getPrimaryServer()); $explainOperation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$command = $event['started']->getCommand();
$this->assertObjectHasAttribute('sort', $command->explain); $this->assertObjectHasAttribute('sort', $command->explain);
$this->assertObjectNotHasAttribute('modifiers', $command->explain); $this->assertObjectNotHasAttribute('modifiers', $command->explain);
} }
......
...@@ -30,8 +30,8 @@ class FindAndModifyFunctionalTest extends FunctionalTestCase ...@@ -30,8 +30,8 @@ class FindAndModifyFunctionalTest extends FunctionalTestCase
$operation->execute($server); $operation->execute($server);
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('readConcern', $command); $this->assertObjectNotHasAttribute('readConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -48,8 +48,8 @@ class FindAndModifyFunctionalTest extends FunctionalTestCase ...@@ -48,8 +48,8 @@ class FindAndModifyFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -70,8 +70,8 @@ class FindAndModifyFunctionalTest extends FunctionalTestCase ...@@ -70,8 +70,8 @@ class FindAndModifyFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -25,8 +25,8 @@ class FindFunctionalTest extends FunctionalTestCase ...@@ -25,8 +25,8 @@ class FindFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('readConcern', $command); $this->assertObjectNotHasAttribute('readConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -99,8 +99,8 @@ class FindFunctionalTest extends FunctionalTestCase ...@@ -99,8 +99,8 @@ class FindFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -70,8 +70,8 @@ class InsertManyFunctionalTest extends FunctionalTestCase ...@@ -70,8 +70,8 @@ class InsertManyFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -85,8 +85,8 @@ class InsertOneFunctionalTest extends FunctionalTestCase ...@@ -85,8 +85,8 @@ class InsertOneFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -86,8 +86,8 @@ class ListCollectionsFunctionalTest extends FunctionalTestCase ...@@ -86,8 +86,8 @@ class ListCollectionsFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -66,8 +66,8 @@ class ListDatabasesFunctionalTest extends FunctionalTestCase ...@@ -66,8 +66,8 @@ class ListDatabasesFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -59,8 +59,8 @@ class ListIndexesFunctionalTest extends FunctionalTestCase ...@@ -59,8 +59,8 @@ class ListIndexesFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -31,8 +31,8 @@ class MapReduceFunctionalTest extends FunctionalTestCase ...@@ -31,8 +31,8 @@ class MapReduceFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('readConcern', $command); $this->assertObjectNotHasAttribute('readConcern', $event['started']->getCommand());
} }
); );
} }
...@@ -55,8 +55,8 @@ class MapReduceFunctionalTest extends FunctionalTestCase ...@@ -55,8 +55,8 @@ class MapReduceFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectNotHasAttribute('writeConcern', $command); $this->assertObjectNotHasAttribute('writeConcern', $event['started']->getCommand());
} }
); );
...@@ -150,8 +150,8 @@ class MapReduceFunctionalTest extends FunctionalTestCase ...@@ -150,8 +150,8 @@ class MapReduceFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -40,8 +40,8 @@ class UpdateFunctionalTest extends FunctionalTestCase ...@@ -40,8 +40,8 @@ class UpdateFunctionalTest extends FunctionalTestCase
$operation->execute($this->getPrimaryServer()); $operation->execute($this->getPrimaryServer());
}, },
function(stdClass $command) { function(array $event) {
$this->assertObjectHasAttribute('lsid', $command); $this->assertObjectHasAttribute('lsid', $event['started']->getCommand());
} }
); );
} }
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
namespace MongoDB\Tests\Operation; namespace MongoDB\Tests\Operation;
use MongoDB\ChangeStream; use MongoDB\ChangeStream;
use MongoDB\BSON\TimestampInterface;
use MongoDB\Driver\Manager; use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference; use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
...@@ -57,7 +58,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -57,7 +58,7 @@ class WatchFunctionalTest extends FunctionalTestCase
'documentKey' => ['_id' => 2], 'documentKey' => ['_id' => 2],
]; ];
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
...@@ -74,7 +75,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -74,7 +75,7 @@ class WatchFunctionalTest extends FunctionalTestCase
'documentKey' => ['_id' => 3] 'documentKey' => ['_id' => 3]
]; ];
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
} }
public function testNextResumesAfterConnectionException() public function testNextResumesAfterConnectionException()
...@@ -98,8 +99,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -98,8 +99,8 @@ class WatchFunctionalTest extends FunctionalTestCase
function() use ($changeStream) { function() use ($changeStream) {
$changeStream->next(); $changeStream->next();
}, },
function(stdClass $command) use (&$commands) { function(array $event) use (&$commands) {
$commands[] = key((array) $command); $commands[] = $event['started']->getCommandName();
} }
); );
$this->fail('ConnectionTimeoutException was not thrown'); $this->fail('ConnectionTimeoutException was not thrown');
...@@ -130,6 +131,100 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -130,6 +131,100 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertSame($expectedCommands, $commands); $this->assertSame($expectedCommands, $commands);
} }
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$operationTime = null;
$events = [];
(new CommandObserver)->observe(
function() use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(1, $events);
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
$operationTime = $events[0]['succeeded']->getReply()->operationTime;
$this->assertInstanceOf(TimestampInterface::class, $operationTime);
$this->assertNull($changeStream->current());
$this->killChangeStreamCursor($changeStream);
$events = [];
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->rewind();
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(4, $events);
$this->assertSame('getMore', $events[0]['started']->getCommandName());
$this->arrayHasKey('failed', $events[0]);
$this->assertSame('aggregate', $events[1]['started']->getCommandName());
$this->assertStartAtOperationTime($operationTime, $events[1]['started']->getCommand());
$this->arrayHasKey('succeeded', $events[1]);
// Original cursor is freed immediately after the change stream resumes
$this->assertSame('killCursors', $events[2]['started']->getCommandName());
$this->arrayHasKey('succeeded', $events[2]);
$this->assertSame('getMore', $events[3]['started']->getCommandName());
$this->arrayHasKey('succeeded', $events[3]);
$this->assertNull($changeStream->current());
$this->killChangeStreamCursor($changeStream);
$events = [];
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function (array $event) use (&$events) {
$events[] = $event;
}
);
$this->assertCount(4, $events);
$this->assertSame('getMore', $events[0]['started']->getCommandName());
$this->arrayHasKey('failed', $events[0]);
$this->assertSame('aggregate', $events[1]['started']->getCommandName());
$this->assertStartAtOperationTime($operationTime, $events[1]['started']->getCommand());
$this->arrayHasKey('succeeded', $events[1]);
// Original cursor is freed immediately after the change stream resumes
$this->assertSame('killCursors', $events[2]['started']->getCommandName());
$this->arrayHasKey('succeeded', $events[2]);
$this->assertSame('getMore', $events[3]['started']->getCommandName());
$this->arrayHasKey('succeeded', $events[3]);
$this->assertNull($changeStream->current());
}
private function assertStartAtOperationTime(TimestampInterface $expectedOperationTime, stdClass $command)
{
$this->assertObjectHasAttribute('pipeline', $command);
$this->assertInternalType('array', $command->pipeline);
$this->assertArrayHasKey(0, $command->pipeline);
$this->assertObjectHasAttribute('$changeStream', $command->pipeline[0]);
$this->assertObjectHasAttribute('startAtOperationTime', $command->pipeline[0]->{'$changeStream'});
$this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime);
}
public function testRewindResumesAfterConnectionException() public function testRewindResumesAfterConnectionException()
{ {
/* In order to trigger a dropped connection, we'll use a new client with /* In order to trigger a dropped connection, we'll use a new client with
...@@ -148,8 +243,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -148,8 +243,8 @@ class WatchFunctionalTest extends FunctionalTestCase
function() use ($changeStream) { function() use ($changeStream) {
$changeStream->rewind(); $changeStream->rewind();
}, },
function(stdClass $command) use (&$commands) { function(array $event) use (&$commands) {
$commands[] = key((array) $command); $commands[] = $event['started']->getCommandName();
} }
); );
$this->fail('ConnectionTimeoutException was not thrown'); $this->fail('ConnectionTimeoutException was not thrown');
...@@ -203,7 +298,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -203,7 +298,7 @@ class WatchFunctionalTest extends FunctionalTestCase
'documentKey' => ['_id' => 2], 'documentKey' => ['_id' => 2],
]; ];
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
...@@ -224,7 +319,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -224,7 +319,7 @@ class WatchFunctionalTest extends FunctionalTestCase
'documentKey' => ['_id' => 3], 'documentKey' => ['_id' => 3],
]; ];
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
} }
public function testKey() public function testKey()
...@@ -452,7 +547,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -452,7 +547,7 @@ class WatchFunctionalTest extends FunctionalTestCase
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1], 'documentKey' => ['_id' => 1],
]; ];
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
$this->killChangeStreamCursor($changeStream); $this->killChangeStreamCursor($changeStream);
...@@ -466,7 +561,7 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -466,7 +561,7 @@ class WatchFunctionalTest extends FunctionalTestCase
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2], 'documentKey' => ['_id' => 2],
]; ];
$this->assertSameDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
} }
/** /**
...@@ -484,16 +579,8 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -484,16 +579,8 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next(); $changeStream->next();
$this->assertTrue($changeStream->valid()); $this->assertTrue($changeStream->valid());
$changeDocument = $changeStream->current();
// Unset the resume token and namespace, which are intentionally omitted
if (is_array($changeDocument)) {
unset($changeDocument['_id'], $changeDocument['ns']);
} else {
unset($changeDocument->_id, $changeDocument->ns);
}
$this->assertEquals($expectedChangeDocument, $changeDocument); $this->assertMatchesDocument($expectedChangeDocument, $changeStream->current());
} }
public function provideTypeMapOptionsAndExpectedChangeDocument() public function provideTypeMapOptionsAndExpectedChangeDocument()
...@@ -600,9 +687,10 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -600,9 +687,10 @@ class WatchFunctionalTest extends FunctionalTestCase
function() use ($operation, &$changeStream) { function() use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer()); $changeStream = $operation->execute($this->getPrimaryServer());
}, },
function($changeStream) use (&$originalSession) { function(array $event) use (&$originalSession) {
if (isset($changeStream->aggregate)) { $command = $event['started']->getCommand();
$originalSession = bin2hex((string) $changeStream->lsid->id); if (isset($command->aggregate)) {
$originalSession = bin2hex((string) $command->lsid->id);
} }
} }
); );
...@@ -614,9 +702,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -614,9 +702,9 @@ class WatchFunctionalTest extends FunctionalTestCase
function() use (&$changeStream) { function() use (&$changeStream) {
$changeStream->next(); $changeStream->next();
}, },
function ($changeStream) use (&$sessionAfterResume, &$commands) { function (array $event) use (&$sessionAfterResume, &$commands) {
$commands[] = key((array) $changeStream); $commands[] = $event['started']->getCommandName();
$sessionAfterResume[] = bin2hex((string) $changeStream->lsid->id); $sessionAfterResume[] = bin2hex((string) $event['started']->getCommand()->lsid->id);
} }
); );
......
...@@ -4,6 +4,7 @@ namespace MongoDB\Tests\Operation; ...@@ -4,6 +4,7 @@ namespace MongoDB\Tests\Operation;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Operation\Watch; use MongoDB\Operation\Watch;
use stdClass;
/** /**
* Although these are unit tests, we extend FunctionalTestCase because Watch is * Although these are unit tests, we extend FunctionalTestCase because Watch is
...@@ -11,6 +12,14 @@ use MongoDB\Operation\Watch; ...@@ -11,6 +12,14 @@ use MongoDB\Operation\Watch;
*/ */
class WatchTest extends FunctionalTestCase class WatchTest extends FunctionalTestCase
{ {
public function testConstructorCollectionNameShouldBeNullIfDatabaseNameIsNull()
{
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('$collectionName should also be null if $databaseName is null');
new Watch($this->manager, null, 'foo', []);
}
public function testConstructorPipelineArgumentMustBeAList() public function testConstructorPipelineArgumentMustBeAList()
{ {
$this->expectException(InvalidArgumentException::class); $this->expectException(InvalidArgumentException::class);
...@@ -67,10 +76,19 @@ class WatchTest extends FunctionalTestCase ...@@ -67,10 +76,19 @@ class WatchTest extends FunctionalTestCase
$options[][] = ['session' => $value]; $options[][] = ['session' => $value];
} }
foreach ($this->getInvalidTimestampValues() as $value) {
$options[][] = ['startAtOperationTime' => $value];
}
foreach ($this->getInvalidArrayValues() as $value) { foreach ($this->getInvalidArrayValues() as $value) {
$options[][] = ['typeMap' => $value]; $options[][] = ['typeMap' => $value];
} }
return $options; return $options;
} }
private function getInvalidTimestampValues()
{
return [123, 3.14, 'foo', true, [], new stdClass];
}
} }
...@@ -69,6 +69,49 @@ abstract class TestCase extends BaseTestCase ...@@ -69,6 +69,49 @@ abstract class TestCase extends BaseTestCase
$this->assertCount(1, $errors); $this->assertCount(1, $errors);
} }
/**
* Asserts that a document has expected values for some fields.
*
* Only fields in the expected document will be checked. The actual document
* may contain additional fields.
*
* @param array|object $expectedDocument
* @param array|object $actualDocument
*/
protected function assertMatchesDocument($expectedDocument, $actualDocument)
{
$normalizedExpectedDocument = $this->normalizeBSON($expectedDocument);
$normalizedActualDocument = $this->normalizeBSON($actualDocument);
$extraKeys = [];
/* Avoid unsetting fields while we're iterating on the ArrayObject to
* work around https://bugs.php.net/bug.php?id=70246 */
foreach ($normalizedActualDocument as $key => $value) {
if ( ! $normalizedExpectedDocument->offsetExists($key)) {
$extraKeys[] = $key;
}
}
foreach ($extraKeys as $key) {
$normalizedActualDocument->offsetUnset($key);
}
$this->assertEquals(
\MongoDB\BSON\toJSON(\MongoDB\BSON\fromPHP($normalizedExpectedDocument)),
\MongoDB\BSON\toJSON(\MongoDB\BSON\fromPHP($normalizedActualDocument))
);
}
/**
* Asserts that a document has expected values for all fields.
*
* The actual document will be compared directly with the expected document
* and may not contain extra fields.
*
* @param array|object $expectedDocument
* @param array|object $actualDocument
*/
protected function assertSameDocument($expectedDocument, $actualDocument) protected function assertSameDocument($expectedDocument, $actualDocument)
{ {
$this->assertEquals( $this->assertEquals(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment