PHPLIB-473: Don't override read preference when in a transaction

parent d4645414
...@@ -170,7 +170,7 @@ class Client ...@@ -170,7 +170,7 @@ class Client
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -246,7 +246,7 @@ class Client ...@@ -246,7 +246,7 @@ class Client
public function listDatabases(array $options = []) public function listDatabases(array $options = [])
{ {
$operation = new ListDatabases($options); $operation = new ListDatabases($options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -307,11 +307,11 @@ class Client ...@@ -307,11 +307,11 @@ class Client
*/ */
public function watch(array $pipeline = [], array $options = []) public function watch(array $pipeline = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
......
...@@ -219,7 +219,7 @@ class Collection ...@@ -219,7 +219,7 @@ class Collection
{ {
$hasWriteStage = is_last_pipeline_operator_write($pipeline); $hasWriteStage = is_last_pipeline_operator_write($pipeline);
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
...@@ -227,7 +227,7 @@ class Collection ...@@ -227,7 +227,7 @@ class Collection
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY); $options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
/* MongoDB 4.2 and later supports a read concern when an $out stage is /* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not. * being used, but earlier versions do not.
...@@ -276,7 +276,7 @@ class Collection ...@@ -276,7 +276,7 @@ class Collection
} }
$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options); $operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -297,11 +297,11 @@ class Collection ...@@ -297,11 +297,11 @@ class Collection
*/ */
public function count($filter = [], array $options = []) public function count($filter = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
...@@ -326,11 +326,11 @@ class Collection ...@@ -326,11 +326,11 @@ class Collection
*/ */
public function countDocuments($filter = [], array $options = []) public function countDocuments($filter = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
...@@ -392,7 +392,7 @@ class Collection ...@@ -392,7 +392,7 @@ class Collection
*/ */
public function createIndexes(array $indexes, array $options = []) public function createIndexes(array $indexes, array $options = [])
{ {
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -422,7 +422,7 @@ class Collection ...@@ -422,7 +422,7 @@ class Collection
} }
$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options); $operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -446,7 +446,7 @@ class Collection ...@@ -446,7 +446,7 @@ class Collection
} }
$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options); $operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -466,11 +466,11 @@ class Collection ...@@ -466,11 +466,11 @@ class Collection
*/ */
public function distinct($fieldName, $filter = [], array $options = []) public function distinct($fieldName, $filter = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
...@@ -497,7 +497,7 @@ class Collection ...@@ -497,7 +497,7 @@ class Collection
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -531,7 +531,7 @@ class Collection ...@@ -531,7 +531,7 @@ class Collection
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -558,7 +558,7 @@ class Collection ...@@ -558,7 +558,7 @@ class Collection
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -582,11 +582,11 @@ class Collection ...@@ -582,11 +582,11 @@ class Collection
*/ */
public function estimatedDocumentCount(array $options = []) public function estimatedDocumentCount(array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
...@@ -611,7 +611,7 @@ class Collection ...@@ -611,7 +611,7 @@ class Collection
*/ */
public function explain(Explainable $explainable, array $options = []) public function explain(Explainable $explainable, array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
...@@ -619,7 +619,7 @@ class Collection ...@@ -619,7 +619,7 @@ class Collection
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
$operation = new Explain($this->databaseName, $explainable, $options); $operation = new Explain($this->databaseName, $explainable, $options);
...@@ -640,11 +640,11 @@ class Collection ...@@ -640,11 +640,11 @@ class Collection
*/ */
public function find($filter = [], array $options = []) public function find($filter = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
...@@ -673,11 +673,11 @@ class Collection ...@@ -673,11 +673,11 @@ class Collection
*/ */
public function findOne($filter = [], array $options = []) public function findOne($filter = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
...@@ -709,7 +709,7 @@ class Collection ...@@ -709,7 +709,7 @@ class Collection
*/ */
public function findOneAndDelete($filter, array $options = []) public function findOneAndDelete($filter, array $options = [])
{ {
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -746,7 +746,7 @@ class Collection ...@@ -746,7 +746,7 @@ class Collection
*/ */
public function findOneAndReplace($filter, $replacement, array $options = []) public function findOneAndReplace($filter, $replacement, array $options = [])
{ {
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -783,7 +783,7 @@ class Collection ...@@ -783,7 +783,7 @@ class Collection
*/ */
public function findOneAndUpdate($filter, $update, array $options = []) public function findOneAndUpdate($filter, $update, array $options = [])
{ {
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -899,7 +899,7 @@ class Collection ...@@ -899,7 +899,7 @@ class Collection
} }
$operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options); $operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -922,7 +922,7 @@ class Collection ...@@ -922,7 +922,7 @@ class Collection
} }
$operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options); $operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -939,7 +939,7 @@ class Collection ...@@ -939,7 +939,7 @@ class Collection
public function listIndexes(array $options = []) public function listIndexes(array $options = [])
{ {
$operation = new ListIndexes($this->databaseName, $this->collectionName, $options); $operation = new ListIndexes($this->databaseName, $this->collectionName, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -963,7 +963,7 @@ class Collection ...@@ -963,7 +963,7 @@ class Collection
{ {
$hasOutputCollection = ! is_mapreduce_output_inline($out); $hasOutputCollection = ! is_mapreduce_output_inline($out);
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
...@@ -972,7 +972,7 @@ class Collection ...@@ -972,7 +972,7 @@ class Collection
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY); $options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
/* A "majority" read concern is not compatible with inline output, so /* A "majority" read concern is not compatible with inline output, so
* avoid providing the Collection's read concern if it would conflict. * avoid providing the Collection's read concern if it would conflict.
...@@ -1016,7 +1016,7 @@ class Collection ...@@ -1016,7 +1016,7 @@ class Collection
} }
$operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options); $operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -1041,7 +1041,7 @@ class Collection ...@@ -1041,7 +1041,7 @@ class Collection
} }
$operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options); $operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -1066,7 +1066,7 @@ class Collection ...@@ -1066,7 +1066,7 @@ class Collection
} }
$operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options); $operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -1082,11 +1082,11 @@ class Collection ...@@ -1082,11 +1082,11 @@ class Collection
*/ */
public function watch(array $pipeline = [], array $options = []) public function watch(array $pipeline = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
/* Although change streams require a newer version of the server than /* Although change streams require a newer version of the server than
* read concerns, perform the usual wire version check before inheriting * read concerns, perform the usual wire version check before inheriting
......
...@@ -198,7 +198,7 @@ class Database ...@@ -198,7 +198,7 @@ class Database
{ {
$hasWriteStage = is_last_pipeline_operator_write($pipeline); $hasWriteStage = is_last_pipeline_operator_write($pipeline);
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
...@@ -206,7 +206,7 @@ class Database ...@@ -206,7 +206,7 @@ class Database
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY); $options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
/* MongoDB 4.2 and later supports a read concern when an $out stage is /* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not. * being used, but earlier versions do not.
...@@ -249,7 +249,7 @@ class Database ...@@ -249,7 +249,7 @@ class Database
*/ */
public function command($command, array $options = []) public function command($command, array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
...@@ -258,7 +258,7 @@ class Database ...@@ -258,7 +258,7 @@ class Database
} }
$operation = new DatabaseCommand($this->databaseName, $command, $options); $operation = new DatabaseCommand($this->databaseName, $command, $options);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -280,7 +280,7 @@ class Database ...@@ -280,7 +280,7 @@ class Database
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -307,7 +307,7 @@ class Database ...@@ -307,7 +307,7 @@ class Database
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -335,7 +335,7 @@ class Database ...@@ -335,7 +335,7 @@ class Database
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -420,7 +420,7 @@ class Database ...@@ -420,7 +420,7 @@ class Database
public function listCollections(array $options = []) public function listCollections(array $options = [])
{ {
$operation = new ListCollections($this->databaseName, $options); $operation = new ListCollections($this->databaseName, $options);
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
return $operation->execute($server); return $operation->execute($server);
} }
...@@ -442,7 +442,7 @@ class Database ...@@ -442,7 +442,7 @@ class Database
$options['typeMap'] = $this->typeMap; $options['typeMap'] = $this->typeMap;
} }
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) { if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern; $options['writeConcern'] = $this->writeConcern;
...@@ -505,11 +505,11 @@ class Database ...@@ -505,11 +505,11 @@ class Database
*/ */
public function watch(array $pipeline = [], array $options = []) public function watch(array $pipeline = [], array $options = [])
{ {
if (! isset($options['readPreference'])) { if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference; $options['readPreference'] = $this->readPreference;
} }
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options)); $server = select_server($this->manager, $options);
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) { if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern; $options['readConcern'] = $this->readConcern;
......
...@@ -40,7 +40,6 @@ use function is_object; ...@@ -40,7 +40,6 @@ use function is_object;
use function is_string; use function is_string;
use function MongoDB\Driver\Monitoring\addSubscriber; use function MongoDB\Driver\Monitoring\addSubscriber;
use function MongoDB\Driver\Monitoring\removeSubscriber; use function MongoDB\Driver\Monitoring\removeSubscriber;
use function MongoDB\extract_session_from_options;
use function MongoDB\select_server; use function MongoDB\select_server;
use function MongoDB\server_supports_feature; use function MongoDB\server_supports_feature;
...@@ -381,7 +380,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber ...@@ -381,7 +380,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
* is not usable within transactions, we still check if there is a * is not usable within transactions, we still check if there is a
* pinned session. This is to avoid an ambiguous error message about * pinned session. This is to avoid an ambiguous error message about
* running a command on the wrong server. */ * running a command on the wrong server. */
$server = select_server($this->manager, $this->aggregateOptions['readPreference'], extract_session_from_options($this->aggregateOptions)); $server = select_server($this->manager, $this->aggregateOptions);
$resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter'; $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
......
...@@ -393,14 +393,40 @@ function extract_session_from_options(array $options) ...@@ -393,14 +393,40 @@ function extract_session_from_options(array $options)
} }
/** /**
* Performs server selection, respecting the server a session may be pinned to * Returns the readPreference option if it is set and valid.
*
* @internal
* @param array $options
* @return ReadPreference|null
*/
function extract_read_preference_from_options(array $options)
{
if (! isset($options['readPreference']) || ! $options['readPreference'] instanceof ReadPreference) {
return null;
}
return $options['readPreference'];
}
/**
* Performs server selection, respecting the readPreference and session options
* (if given)
* *
* @internal * @internal
* @return Server * @return Server
*/ */
function select_server(Manager $manager, ReadPreference $readPreference = null, Session $session = null) function select_server(Manager $manager, array $options)
{ {
$server = $session !== null ? $session->getServer() : null; $session = extract_session_from_options($options);
if ($session instanceof Session && $session->getServer() !== null) {
return $session->getServer();
}
$readPreference = extract_read_preference_from_options($options);
if (! $readPreference instanceof ReadPreference) {
// TODO: PHPLIB-476: Read transaction read preference once PHPC-1439 is implemented
$readPreference = new ReadPreference(ReadPreference::RP_PRIMARY);
}
return $server ?: $manager->selectServer($readPreference); return $manager->selectServer($readPreference);
} }
...@@ -41,12 +41,6 @@ class TransactionsSpecTest extends FunctionalTestCase ...@@ -41,12 +41,6 @@ class TransactionsSpecTest extends FunctionalTestCase
'transactions/mongos-recovery-token: commitTransaction retry fails on new mongos' => 'isMaster failpoints cannot be disabled', 'transactions/mongos-recovery-token: commitTransaction retry fails on new mongos' => 'isMaster failpoints cannot be disabled',
'transactions/pin-mongos: remain pinned after non-transient error on commit' => 'Blocked on SPEC-1320', 'transactions/pin-mongos: remain pinned after non-transient error on commit' => 'Blocked on SPEC-1320',
'transactions/pin-mongos: unpin after transient error within a transaction and commit' => 'isMaster failpoints cannot be disabled', 'transactions/pin-mongos: unpin after transient error within a transaction and commit' => 'isMaster failpoints cannot be disabled',
'transactions/read-pref: default readPreference' => 'PHPLIB does not properly inherit readPreference for transactions (PHPLIB-473)',
'transactions/read-pref: primary readPreference' => 'PHPLIB does not properly inherit readPreference for transactions (PHPLIB-473)',
'transactions/run-command: run command with secondary read preference in client option and primary read preference in transaction options' => 'PHPLIB does not properly inherit readPreference for transactions (PHPLIB-473)',
'transactions/transaction-options: transaction options inherited from client' => 'PHPLIB does not properly inherit readConcern for transactions (PHPLIB-473)',
'transactions/transaction-options: readConcern local in defaultTransactionOptions' => 'PHPLIB does not properly inherit readConcern for transactions (PHPLIB-473)',
'transactions/transaction-options: readConcern snapshot in startTransaction options' => 'PHPLIB does not properly inherit readConcern for transactions (PHPLIB-473)',
]; ];
private function doSetUp() private function doSetUp()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment