PHPLIB-438: Unify handling for write stages in aggregation pipelines

parent 382cb96d
...@@ -39,8 +39,6 @@ source: ...@@ -39,8 +39,6 @@ source:
source: source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-MongoDBCollection-common-option.yaml
ref: readPreference ref: readPreference
post: |
This option will be ignored when using the :ref:`$out <agg-out>` stage.
--- ---
source: source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
...@@ -72,7 +70,8 @@ source: ...@@ -72,7 +70,8 @@ source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-MongoDBCollection-common-option.yaml
ref: writeConcern ref: writeConcern
post: | post: |
This only applies when the :ref:`$out <agg-out>` stage is specified. This only applies when a :ref:`$out <agg-out>` or :ref:`$merge <agg-merge>`
stage is specified.
This is not supported for server versions prior to 3.4 and will result in an This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used. exception at execution time if used.
......
...@@ -33,8 +33,6 @@ source: ...@@ -33,8 +33,6 @@ source:
source: source:
file: apiargs-MongoDBDatabase-common-option.yaml file: apiargs-MongoDBDatabase-common-option.yaml
ref: readPreference ref: readPreference
post: |
This option will be ignored when using the :ref:`$out <agg-out>` stage.
--- ---
source: source:
file: apiargs-common-option.yaml file: apiargs-common-option.yaml
...@@ -48,7 +46,8 @@ source: ...@@ -48,7 +46,8 @@ source:
file: apiargs-MongoDBDatabase-common-option.yaml file: apiargs-MongoDBDatabase-common-option.yaml
ref: writeConcern ref: writeConcern
post: | post: |
This only applies when the :ref:`$out <agg-out>` stage is specified. This only applies when a :ref:`$out <agg-out>` or :ref:`$merge <agg-merge>`
stage is specified.
This is not supported for server versions prior to 3.4 and will result in an This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used. exception at execution time if used.
......
...@@ -24,7 +24,8 @@ source: ...@@ -24,7 +24,8 @@ source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-MongoDBCollection-common-option.yaml
ref: bypassDocumentValidation ref: bypassDocumentValidation
post: | post: |
This only applies when using the :ref:`$out <agg-out>` stage. This only applies when using the :ref:`$out <agg-out>` and
:ref:`$out <agg-merge>` stages.
Document validation requires MongoDB 3.2 or later: if you are using an earlier Document validation requires MongoDB 3.2 or later: if you are using an earlier
version of MongoDB, this option will be ignored. version of MongoDB, this option will be ignored.
......
...@@ -69,7 +69,7 @@ class Collection ...@@ -69,7 +69,7 @@ class Collection
private static $wireVersionForFindAndModifyWriteConcern = 4; private static $wireVersionForFindAndModifyWriteConcern = 4;
private static $wireVersionForReadConcern = 4; private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5; private static $wireVersionForWritableCommandWriteConcern = 5;
private static $wireVersionForReadConcernWithOutStage = 8; private static $wireVersionForReadConcernWithWriteStage = 8;
private $collectionName; private $collectionName;
private $databaseName; private $databaseName;
...@@ -190,13 +190,13 @@ class Collection ...@@ -190,13 +190,13 @@ class Collection
*/ */
public function aggregate(array $pipeline, array $options = []) public function aggregate(array $pipeline, array $options = [])
{ {
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($pipeline); $hasWriteStage = \MongoDB\is_last_pipeline_operator_write($pipeline);
if ( ! isset($options['readPreference'])) { if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
if ($hasOutStage) { if ($hasWriteStage) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY); $options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
} }
...@@ -210,7 +210,7 @@ class Collection ...@@ -210,7 +210,7 @@ class Collection
if ( ! isset($options['readConcern']) && if ( ! isset($options['readConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) &&
! \MongoDB\is_in_transaction($options) && ! \MongoDB\is_in_transaction($options) &&
( ! $hasOutStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithOutStage)) ( ! $hasWriteStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithWriteStage))
) { ) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
} }
...@@ -219,7 +219,7 @@ class Collection ...@@ -219,7 +219,7 @@ class Collection
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
if ($hasOutStage && if ($hasWriteStage &&
! isset($options['writeConcern']) && ! isset($options['writeConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && \MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) &&
! \MongoDB\is_in_transaction($options)) { ! \MongoDB\is_in_transaction($options)) {
......
...@@ -48,7 +48,7 @@ class Database ...@@ -48,7 +48,7 @@ class Database
]; ];
private static $wireVersionForReadConcern = 4; private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5; private static $wireVersionForWritableCommandWriteConcern = 5;
private static $wireVersionForReadConcernWithOutStage = 8; private static $wireVersionForReadConcernWithWriteStage = 8;
private $databaseName; private $databaseName;
private $manager; private $manager;
...@@ -175,13 +175,13 @@ class Database ...@@ -175,13 +175,13 @@ class Database
*/ */
public function aggregate(array $pipeline, array $options = []) public function aggregate(array $pipeline, array $options = [])
{ {
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($pipeline); $hasWriteStage = \MongoDB\is_last_pipeline_operator_write($pipeline);
if ( ! isset($options['readPreference'])) { if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
if ($hasOutStage) { if ($hasWriteStage) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY); $options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
} }
...@@ -195,7 +195,7 @@ class Database ...@@ -195,7 +195,7 @@ class Database
if ( ! isset($options['readConcern']) && if ( ! isset($options['readConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) && \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) &&
! \MongoDB\is_in_transaction($options) && ! \MongoDB\is_in_transaction($options) &&
( ! $hasOutStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithOutStage)) ( ! $hasWriteStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithWriteStage))
) { ) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
} }
...@@ -204,7 +204,7 @@ class Database ...@@ -204,7 +204,7 @@ class Database
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
if ($hasOutStage && if ($hasWriteStage &&
! isset($options['writeConcern']) && ! isset($options['writeConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && \MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) &&
! \MongoDB\is_in_transaction($options)) { ! \MongoDB\is_in_transaction($options)) {
......
...@@ -264,12 +264,12 @@ class Aggregate implements Executable ...@@ -264,12 +264,12 @@ class Aggregate implements Executable
$hasExplain = ! empty($this->options['explain']); $hasExplain = ! empty($this->options['explain']);
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($this->pipeline); $hasWriteStage = \MongoDB\is_last_pipeline_operator_write($this->pipeline);
$command = $this->createCommand($server, $hasOutStage); $command = $this->createCommand($server, $hasWriteStage);
$options = $this->createOptions($hasOutStage, $hasExplain); $options = $this->createOptions($hasWriteStage, $hasExplain);
$cursor = ($hasOutStage && ! $hasExplain) $cursor = ($hasWriteStage && ! $hasExplain)
? $server->executeReadWriteCommand($this->databaseName, $command, $options) ? $server->executeReadWriteCommand($this->databaseName, $command, $options)
: $server->executeReadCommand($this->databaseName, $command, $options); : $server->executeReadCommand($this->databaseName, $command, $options);
...@@ -353,11 +353,11 @@ class Aggregate implements Executable ...@@ -353,11 +353,11 @@ class Aggregate implements Executable
* *
* @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
* @param boolean $hasOutStage * @param boolean $hasWriteStage
* @param boolean $hasExplain * @param boolean $hasExplain
* @return array * @return array
*/ */
private function createOptions($hasOutStage, $hasExplain) private function createOptions($hasWriteStage, $hasExplain)
{ {
$options = []; $options = [];
...@@ -365,7 +365,7 @@ class Aggregate implements Executable ...@@ -365,7 +365,7 @@ class Aggregate implements Executable
$options['readConcern'] = $this->options['readConcern']; $options['readConcern'] = $this->options['readConcern'];
} }
if ( ! $hasOutStage && isset($this->options['readPreference'])) { if (!$hasWriteStage && isset($this->options['readPreference'])) {
$options['readPreference'] = $this->options['readPreference']; $options['readPreference'] = $this->options['readPreference'];
} }
...@@ -373,7 +373,7 @@ class Aggregate implements Executable ...@@ -373,7 +373,7 @@ class Aggregate implements Executable
$options['session'] = $this->options['session']; $options['session'] = $this->options['session'];
} }
if ($hasOutStage && ! $hasExplain && isset($this->options['writeConcern'])) { if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern']; $options['writeConcern'] = $this->options['writeConcern'];
} }
......
...@@ -127,7 +127,7 @@ function is_in_transaction(array $options) ...@@ -127,7 +127,7 @@ function is_in_transaction(array $options)
} }
/** /**
* Return whether the aggregation pipeline ends with an $out operator. * Return whether the aggregation pipeline ends with an $out or $merge operator.
* *
* This is used for determining whether the aggregation pipeline must be * This is used for determining whether the aggregation pipeline must be
* executed against a primary server. * executed against a primary server.
...@@ -136,7 +136,7 @@ function is_in_transaction(array $options) ...@@ -136,7 +136,7 @@ function is_in_transaction(array $options)
* @param array $pipeline List of pipeline operations * @param array $pipeline List of pipeline operations
* @return boolean * @return boolean
*/ */
function is_last_pipeline_operator_out(array $pipeline) function is_last_pipeline_operator_write(array $pipeline)
{ {
$lastOp = end($pipeline); $lastOp = end($pipeline);
...@@ -146,7 +146,7 @@ function is_last_pipeline_operator_out(array $pipeline) ...@@ -146,7 +146,7 @@ function is_last_pipeline_operator_out(array $pipeline)
$lastOp = (array) $lastOp; $lastOp = (array) $lastOp;
return key($lastOp) === '$out'; return in_array(key($lastOp), ['$out', '$merge'], true);
} }
/** /**
......
...@@ -292,7 +292,7 @@ class CrudSpecFunctionalTest extends FunctionalTestCase ...@@ -292,7 +292,7 @@ class CrudSpecFunctionalTest extends FunctionalTestCase
* the result here; however, assertEquivalentCollections() will * the result here; however, assertEquivalentCollections() will
* assert the output collection's contents later. * assert the output collection's contents later.
*/ */
if ( ! \MongoDB\is_last_pipeline_operator_out($operation['arguments']['pipeline'])) { if ( ! \MongoDB\is_last_pipeline_operator_write($operation['arguments']['pipeline'])) {
$this->assertSameDocuments($expectedResult, $actualResult); $this->assertSameDocuments($expectedResult, $actualResult);
} }
break; break;
......
...@@ -63,6 +63,15 @@ final class Context ...@@ -63,6 +63,15 @@ final class Context
$o->outcomeCollectionName = $test->outcome->collection->name; $o->outcomeCollectionName = $test->outcome->collection->name;
} }
$o->defaultWriteOptions = [
'writeConcern' => new WriteConcern(WriteConcern::MAJORITY),
];
$o->outcomeFindOptions = [
'readConcern' => new ReadConcern('local'),
'readPreference' => new ReadPreference('primary'),
];
$o->client = new Client(FunctionalTestCase::getUri(), $clientOptions); $o->client = new Client(FunctionalTestCase::getUri(), $clientOptions);
return $o; return $o;
......
...@@ -14,11 +14,6 @@ class CrudSpecTest extends FunctionalTestCase ...@@ -14,11 +14,6 @@ class CrudSpecTest extends FunctionalTestCase
/* These should all pass before the driver can be considered compatible with /* These should all pass before the driver can be considered compatible with
* MongoDB 4.2. */ * MongoDB 4.2. */
private static $incompleteTests = [ private static $incompleteTests = [
'aggregate-merge: Aggregate with $merge' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and batch size of 0' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and majority readConcern' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and local readConcern' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and available readConcern' => 'PHPLIB-438',
'bulkWrite-arrayFilters: BulkWrite with arrayFilters' => 'Fails due to command assertions', 'bulkWrite-arrayFilters: BulkWrite with arrayFilters' => 'Fails due to command assertions',
'updateWithPipelines: UpdateOne using pipelines' => 'PHPLIB-418', 'updateWithPipelines: UpdateOne using pipelines' => 'PHPLIB-418',
'updateWithPipelines: UpdateMany using pipelines' => 'PHPLIB-418', 'updateWithPipelines: UpdateMany using pipelines' => 'PHPLIB-418',
......
...@@ -415,7 +415,7 @@ final class Operation ...@@ -415,7 +415,7 @@ final class Operation
* the CRUD specification and is not implemented in the library * the CRUD specification and is not implemented in the library
* since we have no concept of lazy cursors. Rely on examining * since we have no concept of lazy cursors. Rely on examining
* the output collection rather than the operation result. */ * the output collection rather than the operation result. */
if (\MongoDB\is_last_pipeline_operator_out($this->arguments['pipeline'])) { if (\MongoDB\is_last_pipeline_operator_write($this->arguments['pipeline'])) {
return ResultExpectation::ASSERT_NOTHING; return ResultExpectation::ASSERT_NOTHING;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment