Commit 8baaedb8 authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-111: Ensure read ops use appropriate read preference

parent ffb2a7df
......@@ -95,9 +95,16 @@ class Collection
*/
public function aggregate(array $pipeline, array $options = array())
{
$readPreference = new ReadPreference(ReadPreference::RP_PRIMARY);
$server = $this->manager->selectServer($readPreference);
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
if (\MongoDB\is_last_pipeline_operator_out($pipeline)) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}
$operation = new Aggregate($this->databaseName, $this->collectionName, $pipeline, $options);
$server = $this->manager->selectServer($options['readPreference']);
return $operation->execute($server);
}
......@@ -132,8 +139,12 @@ class Collection
*/
public function count($filter = array(), array $options = array())
{
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
$operation = new Count($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = $this->manager->selectServer($options['readPreference']);
return $operation->execute($server);
}
......@@ -236,8 +247,12 @@ class Collection
*/
public function distinct($fieldName, $filter = array(), array $options = array())
{
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
$operation = new Distinct($this->databaseName, $this->collectionName, $fieldName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = $this->manager->selectServer($options['readPreference']);
return $operation->execute($server);
}
......@@ -300,8 +315,12 @@ class Collection
*/
public function find($filter = array(), array $options = array())
{
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
$operation = new Find($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = $this->manager->selectServer($options['readPreference']);
return $operation->execute($server);
}
......@@ -317,8 +336,12 @@ class Collection
*/
public function findOne($filter = array(), array $options = array())
{
if ( ! isset($options['readPreference'])) {
$options['readPreference'] = $this->readPreference;
}
$operation = new FindOne($this->databaseName, $this->collectionName, $filter, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$server = $this->manager->selectServer($options['readPreference']);
return $operation->execute($server);
}
......
......@@ -3,6 +3,7 @@
namespace MongoDB\Operation;
use MongoDB\Driver\Command;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
......@@ -42,6 +43,8 @@ class Aggregate implements Executable
* * maxTimeMS (integer): The maximum amount of time to allow the query to
* run.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
*
* * useCursor (boolean): Indicates whether the command will request that
* the server provide results using a cursor. The default is true.
*
......@@ -94,6 +97,10 @@ class Aggregate implements Executable
throw new InvalidArgumentTypeException('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
}
if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
throw new InvalidArgumentTypeException('"readPreference" option', $options['readPreference'], 'MongoDB\Driver\ReadPreference');
}
if ( ! is_bool($options['useCursor'])) {
throw new InvalidArgumentTypeException('"useCursor" option', $options['useCursor'], 'boolean');
}
......@@ -118,9 +125,10 @@ class Aggregate implements Executable
public function execute(Server $server)
{
$isCursorSupported = \MongoDB\server_supports_feature($server, self::$wireVersionForCursor);
$readPreference = isset($this->options['readPreference']) ? $this->options['readPreference'] : null;
$command = $this->createCommand($server, $isCursorSupported);
$cursor = $server->executeCommand($this->databaseName, $command);
$cursor = $server->executeCommand($this->databaseName, $command, $readPreference);
if ($isCursorSupported && $this->options['useCursor']) {
return $cursor;
......
......@@ -3,6 +3,7 @@
namespace MongoDB\Operation;
use MongoDB\Driver\Command;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
......@@ -36,6 +37,8 @@ class Count implements Executable
* * maxTimeMS (integer): The maximum amount of time to allow the query to
* run.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
*
* * skip (integer): The number of documents to skip before returning the
* documents.
*
......@@ -69,6 +72,10 @@ class Count implements Executable
throw new InvalidArgumentTypeException('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
}
if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
throw new InvalidArgumentTypeException('"readPreference" option', $options['readPreference'], 'MongoDB\Driver\ReadPreference');
}
if (isset($options['skip']) && ! is_integer($options['skip'])) {
throw new InvalidArgumentTypeException('"skip" option', $options['skip'], 'integer');
}
......@@ -88,7 +95,9 @@ class Count implements Executable
*/
public function execute(Server $server)
{
$cursor = $server->executeCommand($this->databaseName, $this->createCommand());
$readPreference = isset($this->options['readPreference']) ? $this->options['readPreference'] : null;
$cursor = $server->executeCommand($this->databaseName, $this->createCommand(), $readPreference);
$result = current($cursor->toArray());
if (empty($result->ok)) {
......
......@@ -3,6 +3,7 @@
namespace MongoDB\Operation;
use MongoDB\Driver\Command;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
......@@ -32,6 +33,8 @@ class Distinct implements Executable
* * maxTimeMS (integer): The maximum amount of time to allow the query to
* run.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
*
* @param string $databaseName Database name
* @param string $collectionName Collection name
* @param string $fieldName Field for which to return distinct values
......@@ -49,6 +52,10 @@ class Distinct implements Executable
throw new InvalidArgumentTypeException('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
}
if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
throw new InvalidArgumentTypeException('"readPreference" option', $options['readPreference'], 'MongoDB\Driver\ReadPreference');
}
$this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName;
$this->fieldName = (string) $fieldName;
......@@ -65,7 +72,9 @@ class Distinct implements Executable
*/
public function execute(Server $server)
{
$cursor = $server->executeCommand($this->databaseName, $this->createCommand());
$readPreference = isset($this->options['readPreference']) ? $this->options['readPreference'] : null;
$cursor = $server->executeCommand($this->databaseName, $this->createCommand(), $readPreference);
$result = current($cursor->toArray());
if (empty($result->ok)) {
......
......@@ -3,6 +3,7 @@
namespace MongoDB\Operation;
use MongoDB\Driver\Query;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
......@@ -64,6 +65,8 @@ class Find implements Executable
* * projection (document): Limits the fields to return for the matching
* document.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
*
* * skip (integer): The number of documents to skip before returning.
*
* * sort (document): The order in which to return matching documents. If
......@@ -130,6 +133,10 @@ class Find implements Executable
throw new InvalidArgumentTypeException('"projection" option', $options['projection'], 'array or object');
}
if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
throw new InvalidArgumentTypeException('"readPreference" option', $options['readPreference'], 'MongoDB\Driver\ReadPreference');
}
if (isset($options['skip']) && ! is_integer($options['skip'])) {
throw new InvalidArgumentTypeException('"skip" option', $options['skip'], 'integer');
}
......@@ -153,7 +160,9 @@ class Find implements Executable
*/
public function execute(Server $server)
{
return $server->executeQuery($this->databaseName . '.' . $this->collectionName, $this->createQuery());
$readPreference = isset($this->options['readPreference']) ? $this->options['readPreference'] : null;
return $server->executeQuery($this->databaseName . '.' . $this->collectionName, $this->createQuery(), $readPreference);
}
/**
......
......@@ -34,6 +34,8 @@ class FindOne implements Executable
* * projection (document): Limits the fields to return for the matching
* document.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
*
* * skip (integer): The number of documents to skip before returning.
*
* * sort (document): The order in which to return matching documents. If
......
......@@ -34,6 +34,29 @@ function is_first_key_operator($document)
return (isset($firstKey[0]) && $firstKey[0] == '$');
}
/**
* Return whether the aggregation pipeline ends with an $out operator.
*
* This is used for determining whether the aggregation pipeline msut be
* executed against a primary server.
*
* @internal
* @param array $pipeline List of pipeline operations
* @return boolean
*/
function is_last_pipeline_operator_out(array $pipeline)
{
$lastOp = end($pipeline);
if ($lastOp === false) {
return false;
}
$lastOp = (array) $lastOp;
return key($lastOp) === '$out';
}
/**
* Returns a ReadPreference corresponding to the Manager's read preference.
*
......
......@@ -49,6 +49,10 @@ class AggregateTest extends TestCase
$options[][] = array('maxTimeMS' => $value);
}
foreach ($this->getInvalidReadPreferenceValues() as $value) {
$options[][] = array('readPreference' => $value);
}
foreach ($this->getInvalidBooleanValues() as $value) {
$options[][] = array('useCursor' => $value);
}
......
......@@ -40,6 +40,10 @@ class CountTest extends TestCase
$options[][] = array('maxTimeMS' => $value);
}
foreach ($this->getInvalidReadPreferenceValues() as $value) {
$options[][] = array('readPreference' => $value);
}
foreach ($this->getInvalidIntegerValues() as $value) {
$options[][] = array('skip' => $value);
}
......
......@@ -32,6 +32,10 @@ class DistinctTest extends TestCase
$options[][] = array('maxTimeMS' => $value);
}
foreach ($this->getInvalidReadPreferenceValues() as $value) {
$options[][] = array('readPreference' => $value);
}
return $options;
}
}
......@@ -65,6 +65,10 @@ class FindTest extends TestCase
$options[][] = array('projection' => $value);
}
foreach ($this->getInvalidReadPreferenceValues() as $value) {
$options[][] = array('readPreference' => $value);
}
foreach ($this->getInvalidIntegerValues() as $value) {
$options[][] = array('skip' => $value);
}
......
......@@ -40,6 +40,11 @@ abstract class TestCase extends BaseTestCase
return array(123, 3.14, true, array(), new stdClass);
}
protected function getInvalidReadPreferenceValues()
{
return array(123, 3.14, 'foo', true, array(), new stdClass);
}
protected function getInvalidWriteConcernValues()
{
return array(123, 3.14, 'foo', true, array(), new stdClass);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment