Unverified Commit 039a6945 authored by Andreas Braun's avatar Andreas Braun

PHPLIB-438: Unify handling for write stages in aggregation pipelines

Merged from #644

https://jira.mongodb.org/browse/PHPLIB-438
parents 382cb96d 5107f939
......@@ -39,8 +39,6 @@ source:
source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: readPreference
post: |
This option will be ignored when using the :ref:`$out <agg-out>` stage.
---
source:
file: apiargs-common-option.yaml
......@@ -72,7 +70,8 @@ source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: writeConcern
post: |
This only applies when the :ref:`$out <agg-out>` stage is specified.
This only applies when a :ref:`$out <agg-out>` or :ref:`$merge <agg-merge>`
stage is specified.
This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used.
......
......@@ -33,8 +33,6 @@ source:
source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: readPreference
post: |
This option will be ignored when using the :ref:`$out <agg-out>` stage.
---
source:
file: apiargs-common-option.yaml
......@@ -48,7 +46,8 @@ source:
file: apiargs-MongoDBDatabase-common-option.yaml
ref: writeConcern
post: |
This only applies when the :ref:`$out <agg-out>` stage is specified.
This only applies when a :ref:`$out <agg-out>` or :ref:`$merge <agg-merge>`
stage is specified.
This is not supported for server versions prior to 3.4 and will result in an
exception at execution time if used.
......
......@@ -24,7 +24,8 @@ source:
file: apiargs-MongoDBCollection-common-option.yaml
ref: bypassDocumentValidation
post: |
This only applies when using the :ref:`$out <agg-out>` stage.
This only applies when using the :ref:`$out <agg-out>` and
:ref:`$out <agg-merge>` stages.
Document validation requires MongoDB 3.2 or later: if you are using an earlier
version of MongoDB, this option will be ignored.
......
......@@ -69,7 +69,7 @@ class Collection
private static $wireVersionForFindAndModifyWriteConcern = 4;
private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5;
private static $wireVersionForReadConcernWithOutStage = 8;
private static $wireVersionForReadConcernWithWriteStage = 8;
private $collectionName;
private $databaseName;
......@@ -190,13 +190,13 @@ class Collection
*/
public function aggregate(array $pipeline, array $options = [])
{
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($pipeline);
$hasWriteStage = \MongoDB\is_last_pipeline_operator_write($pipeline);
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
if ($hasOutStage) {
if ($hasWriteStage) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}
......@@ -210,7 +210,7 @@ class Collection
if ( ! isset($options['readConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) &&
! \MongoDB\is_in_transaction($options) &&
( ! $hasOutStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithOutStage))
( ! $hasWriteStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithWriteStage))
) {
$options['readConcern'] = $this->readConcern;
}
......@@ -219,7 +219,7 @@ class Collection
$options['typeMap'] = $this->typeMap;
}
if ($hasOutStage &&
if ($hasWriteStage &&
! isset($options['writeConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) &&
! \MongoDB\is_in_transaction($options)) {
......
......@@ -48,7 +48,7 @@ class Database
];
private static $wireVersionForReadConcern = 4;
private static $wireVersionForWritableCommandWriteConcern = 5;
private static $wireVersionForReadConcernWithOutStage = 8;
private static $wireVersionForReadConcernWithWriteStage = 8;
private $databaseName;
private $manager;
......@@ -175,13 +175,13 @@ class Database
*/
public function aggregate(array $pipeline, array $options = [])
{
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($pipeline);
$hasWriteStage = \MongoDB\is_last_pipeline_operator_write($pipeline);
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
if ($hasOutStage) {
if ($hasWriteStage) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}
......@@ -195,7 +195,7 @@ class Database
if ( ! isset($options['readConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForReadConcern) &&
! \MongoDB\is_in_transaction($options) &&
( ! $hasOutStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithOutStage))
( ! $hasWriteStage || \MongoDB\server_supports_feature($server, self::$wireVersionForReadConcernWithWriteStage))
) {
$options['readConcern'] = $this->readConcern;
}
......@@ -204,7 +204,7 @@ class Database
$options['typeMap'] = $this->typeMap;
}
if ($hasOutStage &&
if ($hasWriteStage &&
! isset($options['writeConcern']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) &&
! \MongoDB\is_in_transaction($options)) {
......
......@@ -264,12 +264,12 @@ class Aggregate implements Executable
$hasExplain = ! empty($this->options['explain']);
$hasOutStage = \MongoDB\is_last_pipeline_operator_out($this->pipeline);
$hasWriteStage = \MongoDB\is_last_pipeline_operator_write($this->pipeline);
$command = $this->createCommand($server, $hasOutStage);
$options = $this->createOptions($hasOutStage, $hasExplain);
$command = $this->createCommand($server, $hasWriteStage);
$options = $this->createOptions($hasWriteStage, $hasExplain);
$cursor = ($hasOutStage && ! $hasExplain)
$cursor = ($hasWriteStage && ! $hasExplain)
? $server->executeReadWriteCommand($this->databaseName, $command, $options)
: $server->executeReadCommand($this->databaseName, $command, $options);
......@@ -353,11 +353,11 @@ class Aggregate implements Executable
*
* @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
* @param boolean $hasOutStage
* @param boolean $hasWriteStage
* @param boolean $hasExplain
* @return array
*/
private function createOptions($hasOutStage, $hasExplain)
private function createOptions($hasWriteStage, $hasExplain)
{
$options = [];
......@@ -365,7 +365,7 @@ class Aggregate implements Executable
$options['readConcern'] = $this->options['readConcern'];
}
if ( ! $hasOutStage && isset($this->options['readPreference'])) {
if (!$hasWriteStage && isset($this->options['readPreference'])) {
$options['readPreference'] = $this->options['readPreference'];
}
......@@ -373,7 +373,7 @@ class Aggregate implements Executable
$options['session'] = $this->options['session'];
}
if ($hasOutStage && ! $hasExplain && isset($this->options['writeConcern'])) {
if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern'];
}
......
......@@ -127,7 +127,7 @@ function is_in_transaction(array $options)
}
/**
* Return whether the aggregation pipeline ends with an $out operator.
* Return whether the aggregation pipeline ends with an $out or $merge operator.
*
* This is used for determining whether the aggregation pipeline must be
* executed against a primary server.
......@@ -136,7 +136,7 @@ function is_in_transaction(array $options)
* @param array $pipeline List of pipeline operations
* @return boolean
*/
function is_last_pipeline_operator_out(array $pipeline)
function is_last_pipeline_operator_write(array $pipeline)
{
$lastOp = end($pipeline);
......@@ -146,7 +146,7 @@ function is_last_pipeline_operator_out(array $pipeline)
$lastOp = (array) $lastOp;
return key($lastOp) === '$out';
return in_array(key($lastOp), ['$out', '$merge'], true);
}
/**
......
......@@ -292,7 +292,7 @@ class CrudSpecFunctionalTest extends FunctionalTestCase
* the result here; however, assertEquivalentCollections() will
* assert the output collection's contents later.
*/
if ( ! \MongoDB\is_last_pipeline_operator_out($operation['arguments']['pipeline'])) {
if ( ! \MongoDB\is_last_pipeline_operator_write($operation['arguments']['pipeline'])) {
$this->assertSameDocuments($expectedResult, $actualResult);
}
break;
......
......@@ -63,6 +63,15 @@ final class Context
$o->outcomeCollectionName = $test->outcome->collection->name;
}
$o->defaultWriteOptions = [
'writeConcern' => new WriteConcern(WriteConcern::MAJORITY),
];
$o->outcomeFindOptions = [
'readConcern' => new ReadConcern('local'),
'readPreference' => new ReadPreference('primary'),
];
$o->client = new Client(FunctionalTestCase::getUri(), $clientOptions);
return $o;
......
......@@ -14,11 +14,6 @@ class CrudSpecTest extends FunctionalTestCase
/* These should all pass before the driver can be considered compatible with
* MongoDB 4.2. */
private static $incompleteTests = [
'aggregate-merge: Aggregate with $merge' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and batch size of 0' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and majority readConcern' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and local readConcern' => 'PHPLIB-438',
'aggregate-merge: Aggregate with $merge and available readConcern' => 'PHPLIB-438',
'bulkWrite-arrayFilters: BulkWrite with arrayFilters' => 'Fails due to command assertions',
'updateWithPipelines: UpdateOne using pipelines' => 'PHPLIB-418',
'updateWithPipelines: UpdateMany using pipelines' => 'PHPLIB-418',
......
......@@ -415,7 +415,7 @@ final class Operation
* the CRUD specification and is not implemented in the library
* since we have no concept of lazy cursors. Rely on examining
* the output collection rather than the operation result. */
if (\MongoDB\is_last_pipeline_operator_out($this->arguments['pipeline'])) {
if (\MongoDB\is_last_pipeline_operator_write($this->arguments['pipeline'])) {
return ResultExpectation::ASSERT_NOTHING;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment