PHPLIB-466: Respect pinned session during server selection

parent 4a3db491
......@@ -170,7 +170,7 @@ class Client
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -246,7 +246,7 @@ class Client
public function listDatabases(array $options = [])
{
$operation = new ListDatabases($options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -311,7 +311,7 @@ class Client
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
......
......@@ -227,7 +227,7 @@ class Collection
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
......@@ -276,7 +276,7 @@ class Collection
}
$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -301,7 +301,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
......@@ -330,7 +330,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
......@@ -392,7 +392,7 @@ class Collection
*/
public function createIndexes(array $indexes, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -422,7 +422,7 @@ class Collection
}
$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -446,7 +446,7 @@ class Collection
}
$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -470,7 +470,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
......@@ -497,7 +497,7 @@ class Collection
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -531,7 +531,7 @@ class Collection
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -558,7 +558,7 @@ class Collection
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -586,7 +586,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
......@@ -619,7 +619,7 @@ class Collection
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
$operation = new Explain($this->databaseName, $explainable, $options);
......@@ -644,7 +644,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
......@@ -677,7 +677,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern) && ! is_in_transaction($options)) {
$options['readConcern'] = $this->readConcern;
......@@ -709,7 +709,7 @@ class Collection
*/
public function findOneAndDelete($filter, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -746,7 +746,7 @@ class Collection
*/
public function findOneAndReplace($filter, $replacement, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -783,7 +783,7 @@ class Collection
*/
public function findOneAndUpdate($filter, $update, array $options = [])
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForFindAndModifyWriteConcern) && ! is_in_transaction($options)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -899,7 +899,7 @@ class Collection
}
$operation = new InsertMany($this->databaseName, $this->collectionName, $documents, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -922,7 +922,7 @@ class Collection
}
$operation = new InsertOne($this->databaseName, $this->collectionName, $document, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -939,7 +939,7 @@ class Collection
public function listIndexes(array $options = [])
{
$operation = new ListIndexes($this->databaseName, $this->collectionName, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -972,7 +972,7 @@ class Collection
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
/* A "majority" read concern is not compatible with inline output, so
* avoid providing the Collection's read concern if it would conflict.
......@@ -1016,7 +1016,7 @@ class Collection
}
$operation = new ReplaceOne($this->databaseName, $this->collectionName, $filter, $replacement, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -1041,7 +1041,7 @@ class Collection
}
$operation = new UpdateMany($this->databaseName, $this->collectionName, $filter, $update, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -1066,7 +1066,7 @@ class Collection
}
$operation = new UpdateOne($this->databaseName, $this->collectionName, $filter, $update, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -1086,7 +1086,7 @@ class Collection
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
/* Although change streams require a newer version of the server than
* read concerns, perform the usual wire version check before inheriting
......
......@@ -206,7 +206,7 @@ class Database
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
......@@ -258,7 +258,7 @@ class Database
}
$operation = new DatabaseCommand($this->databaseName, $command, $options);
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -280,7 +280,7 @@ class Database
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -307,7 +307,7 @@ class Database
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -335,7 +335,7 @@ class Database
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -420,7 +420,7 @@ class Database
public function listCollections(array $options = [])
{
$operation = new ListCollections($this->databaseName, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
return $operation->execute($server);
}
......@@ -442,7 +442,7 @@ class Database
$options['typeMap'] = $this->typeMap;
}
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = select_server($this->manager, new ReadPreference(ReadPreference::RP_PRIMARY), extract_session_from_options($options));
if (! isset($options['writeConcern']) && server_supports_feature($server, self::$wireVersionForWritableCommandWriteConcern)) {
$options['writeConcern'] = $this->writeConcern;
......@@ -509,7 +509,7 @@ class Database
$options['readPreference'] = $this->readPreference;
}
$server = $this->manager->selectServer($options['readPreference']);
$server = select_server($this->manager, $options['readPreference'], extract_session_from_options($options));
if (! isset($options['readConcern']) && server_supports_feature($server, self::$wireVersionForReadConcern)) {
$options['readConcern'] = $this->readConcern;
......
......@@ -40,6 +40,8 @@ use function is_object;
use function is_string;
use function MongoDB\Driver\Monitoring\addSubscriber;
use function MongoDB\Driver\Monitoring\removeSubscriber;
use function MongoDB\extract_session_from_options;
use function MongoDB\select_server;
use function MongoDB\server_supports_feature;
/**
......@@ -375,8 +377,11 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
$this->hasResumed = true;
// Select a new server using the original read preference
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);
/* Select a new server using the original read preference. While watch
* is not usable within transactions, we still check if there is a
* pinned session. This is to avoid an ambiguous error message about
* running a command on the wrong server. */
$server = select_server($this->manager, $this->aggregateOptions['readPreference'], extract_session_from_options($this->aggregateOptions));
$resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
......
......@@ -19,6 +19,8 @@ namespace MongoDB;
use Exception;
use MongoDB\BSON\Serializable;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Driver\Session;
use MongoDB\Exception\InvalidArgumentException;
......@@ -373,3 +375,32 @@ function with_transaction(Session $session, callable $callback, array $transacti
$operation = new WithTransaction($callback, $transactionOptions);
$operation->execute($session);
}
/**
* Returns the session option if it is set and valid.
*
* @internal
* @param array $options
* @return Session|null
*/
function extract_session_from_options(array $options)
{
if (! isset($options['session']) || ! $options['session'] instanceof Session) {
return null;
}
return $options['session'];
}
/**
* Performs server selection, respecting the server a session may be pinned to
*
* @internal
* @return Server
*/
function select_server(Manager $manager, ReadPreference $readPreference = null, Session $session = null)
{
$server = $session !== null ? $session->getServer() : null;
return $server ?: $manager->selectServer($readPreference);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment