Commit d49ab3ab authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-407: Support startAfter change stream option

Note: this does not address using the startAfter option for resuming. That will be implemented in PHPLIB-411 with the introduction of a resume token accessor method and support for postBatchResumeToken.
parent 4fc1f90b
...@@ -34,6 +34,12 @@ source: ...@@ -34,6 +34,12 @@ source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
ref: session ref: session
--- ---
source:
file: apiargs-method-watch-option.yaml
ref: startAfter
post: |
.. versionadded: 1.5
---
source: source:
file: apiargs-method-watch-option.yaml file: apiargs-method-watch-option.yaml
ref: startAtOperationTime ref: startAtOperationTime
......
...@@ -34,6 +34,12 @@ source: ...@@ -34,6 +34,12 @@ source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
ref: session ref: session
--- ---
source:
file: apiargs-method-watch-option.yaml
ref: startAfter
post: |
.. versionadded: 1.5
---
source: source:
file: apiargs-method-watch-option.yaml file: apiargs-method-watch-option.yaml
ref: startAtOperationTime ref: startAtOperationTime
......
...@@ -34,6 +34,12 @@ source: ...@@ -34,6 +34,12 @@ source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
ref: session ref: session
--- ---
source:
file: apiargs-method-watch-option.yaml
ref: startAfter
post: |
.. versionadded: 1.5
---
source: source:
file: apiargs-method-watch-option.yaml file: apiargs-method-watch-option.yaml
ref: startAtOperationTime ref: startAtOperationTime
......
...@@ -46,8 +46,32 @@ description: | ...@@ -46,8 +46,32 @@ description: |
Specifies the logical starting point for the new change stream. The ``_id`` Specifies the logical starting point for the new change stream. The ``_id``
field in documents returned by the change stream may be used here. field in documents returned by the change stream may be used here.
Using this option in conjunction with ``startAtOperationTime`` will result in Using this option in conjunction with ``startAfter`` and/or
a server error. The options are mutually exclusive. ``startAtOperationTime`` will result in a server error. The options are
mutually exclusive.
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: startAfter
type: array|object
description: |
Specifies the logical starting point for the new change stream. The ``_id``
field in documents returned by the change stream may be used here. Unlike
``resumeAfter``, this option can be used with a resume token from an
"invalidate" event.
Using this option in conjunction with ``resumeAfter`` and/or
``startAtOperationTime`` will result in a server error. The options are
mutually exclusive.
This is not supported for server versions prior to 4.2 and will result in an
exception at execution time if used.
.. note:: .. note::
...@@ -66,8 +90,8 @@ description: | ...@@ -66,8 +90,8 @@ description: |
``operationTime`` returned by the initial ``aggregate`` command will be used ``operationTime`` returned by the initial ``aggregate`` command will be used
if available. if available.
Using this option in conjunction with ``resumeAfter`` will result in a server Using this option in conjunction with ``resumeAfter`` and/or ``startAfter``
error. The options are mutually exclusive. will result in a server error. The options are mutually exclusive.
This is not supported for server versions prior to 4.0 and will result in an This is not supported for server versions prior to 4.0 and will result in an
exception at execution time if used. exception at execution time if used.
......
...@@ -92,21 +92,31 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -92,21 +92,31 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
* * resumeAfter (document): Specifies the logical starting point for the * * resumeAfter (document): Specifies the logical starting point for the
* new change stream. * new change stream.
* *
* Using this option in conjunction with "startAtOperationTime" will * Using this option in conjunction with "startAfter" and/or
* result in a server error. The options are mutually exclusive. * "startAtOperationTime" will result in a server error. The options are
* mutually exclusive.
* *
* * session (MongoDB\Driver\Session): Client session. * * session (MongoDB\Driver\Session): Client session.
* *
* Sessions are not supported for server versions < 3.6. * Sessions are not supported for server versions < 3.6.
* *
* * startAfter (document): Specifies the logical starting point for the
* new change stream. Unlike "resumeAfter", this option can be used with
* a resume token from an "invalidate" event.
*
* Using this option in conjunction with "resumeAfter" and/or
* "startAtOperationTime" will result in a server error. The options are
* mutually exclusive.
*
* * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, * * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
* the change stream will only provide changes that occurred at or after * the change stream will only provide changes that occurred at or after
* the specified timestamp. Any command run against the server will * the specified timestamp. Any command run against the server will
* return an operation time that can be used here. Alternatively, an * return an operation time that can be used here. Alternatively, an
* operation time may be obtained from MongoDB\Driver\Server::getInfo(). * operation time may be obtained from MongoDB\Driver\Server::getInfo().
* *
* Using this option in conjunction with "resumeAfter" will result in a * Using this option in conjunction with "resumeAfter" and/or
* server error. The options are mutually exclusive. * "startAfter" will result in a server error. The options are mutually
* exclusive.
* *
* This option is not supported for server versions < 4.0. * This option is not supported for server versions < 4.0.
* *
...@@ -143,6 +153,10 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -143,6 +153,10 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object'); throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
} }
if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
}
if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) { if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class); throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
} }
...@@ -162,7 +176,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -162,7 +176,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
} }
$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]); $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAtOperationTime' => 1]); $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
// Null database name implies a cluster-wide change stream // Null database name implies a cluster-wide change stream
if ($databaseName === null) { if ($databaseName === null) {
......
...@@ -759,6 +759,78 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -759,6 +759,78 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertMatchesDocument($expectedResult, $changeStream->current()); $this->assertMatchesDocument($expectedResult, $changeStream->current());
} }
public function testResumeAfterOption()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->current()->_id;
$options = $this->defaultOptions + ['resumeAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}
public function testStartAfterOption()
{
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
$this->markTestSkipped('startAfter is not supported');
}
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$this->assertTrue($changeStream->valid());
$resumeToken = $changeStream->current()->_id;
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}
/** /**
* @dataProvider provideTypeMapOptionsAndExpectedChangeDocument * @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment