Commit 7973b380 authored by Jeremy Mikola's avatar Jeremy Mikola

Extract Collection::aggregate() to an operation class

parent a079417e
......@@ -15,6 +15,8 @@ use MongoDB\Exception\UnexpectedTypeException;
use MongoDB\Model\IndexInfoIterator;
use MongoDB\Model\IndexInfoIteratorIterator;
use MongoDB\Model\IndexInput;
use MongoDB\Operation\Aggregate;
use Traversable;
class Collection
{
......@@ -78,79 +80,25 @@ class Collection
}
/**
* Runs an aggregation framework pipeline
* Executes an aggregation framework pipeline on the collection.
*
* Note: this method's return value depends on the MongoDB server version
* and the "useCursor" option. If "useCursor" is true, a Cursor will be
* returned; otherwise, an ArrayIterator is returned, which wraps the
* "result" array from the command response document.
*
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*
* @param array $pipeline The pipeline to execute
* @param array $options Additional options
* @return Iterator
* @see Aggregate::__construct() for supported options
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @return Traversable
*/
public function aggregate(array $pipeline, array $options = array())
{
$readPreference = new ReadPreference(ReadPreference::RP_PRIMARY);
$server = $this->manager->selectServer($readPreference);
$operation = new Aggregate($this->dbname, $this->collname, $pipeline, $options);
if (FeatureDetection::isSupported($server, FeatureDetection::API_AGGREGATE_CURSOR)) {
$options = array_merge(
array(
/**
* Enables writing to temporary files. When set to true, aggregation stages
* can write data to the _tmp subdirectory in the dbPath directory. The
* default is false.
*
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*/
'allowDiskUse' => false,
/**
* The number of documents to return per batch.
*
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*/
'batchSize' => 0,
/**
* The maximum amount of time to allow the query to run.
*
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*/
'maxTimeMS' => 0,
/**
* Indicates if the results should be provided as a cursor.
*
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*/
'useCursor' => true,
),
$options
);
}
$options = $this->_massageAggregateOptions($options);
$command = new Command(array(
'aggregate' => $this->collname,
'pipeline' => $pipeline,
) + $options);
$cursor = $server->executeCommand($this->dbname, $command);
if ( ! empty($options["cursor"])) {
return $cursor;
}
$doc = current($cursor->toArray());
if ($doc["ok"]) {
return new \ArrayIterator(array_map(
function (\stdClass $document) { return (array) $document; },
$doc["result"]
));
}
throw $this->_generateCommandException($doc);
return $operation->execute($server);
}
/**
......@@ -1168,22 +1116,6 @@ class Collection
return new RuntimeException("FIXME: Unknown error");
}
/**
* Internal helper for massaging aggregate options
* @internal
*/
protected function _massageAggregateOptions($options)
{
if ( ! empty($options["useCursor"])) {
$options["cursor"] = isset($options["batchSize"])
? array("batchSize" => (integer) $options["batchSize"])
: new stdClass;
}
unset($options["useCursor"], $options["batchSize"]);
return $options;
}
/**
* Internal helper for massaging findandmodify options
* @internal
......
<?php
namespace MongoDB\Operation;
use MongoDB\Driver\Command;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
use MongoDB\Exception\RuntimeException;
use MongoDB\Exception\UnexpectedValueException;
use ArrayIterator;
use stdClass;
use Traversable;
/**
* Operation for the aggregate command.
*
* @api
* @see MongoDB\Collection::aggregate()
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*/
class Aggregate implements Executable
{
private static $wireVersionForCursor = 2;
private $databaseName;
private $collectionName;
private $pipeline;
private $options;
/**
* Constructs an aggregate command.
*
* Supported options:
*
* * allowDiskUse (boolean): Enables writing to temporary files. When set
* to true, aggregation stages can write data to the _tmp sub-directory
* in the dbPath directory. The default is false.
*
* * batchSize (integer): The number of documents to return per batch.
*
* * maxTimeMS (integer): The maximum amount of time to allow the query to
* run.
*
* * useCursor (boolean): Indicates whether the command will request that
* the server provide results using a cursor. The default is true.
*
* For servers < 2.6, this option is ignored as aggregation cursors are
* not available.
*
* For servers >= 2.6, this option allows users to turn off cursors if
* necessary to aid in mongod/mongos upgrades.
*
* @param string $databaseName Database name
* @param string $collectionName Collection name
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @throws InvalidArgumentException
*/
public function __construct($databaseName, $collectionName, array $pipeline, array $options = array())
{
$options += array(
'allowDiskUse' => false,
'useCursor' => true,
);
if ( ! is_bool($options['allowDiskUse'])) {
throw new InvalidArgumentTypeException('"allowDiskUse" option', $options['allowDiskUse'], 'boolean');
}
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
throw new InvalidArgumentTypeException('"batchSize" option', $options['batchSize'], 'integer');
}
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
throw new InvalidArgumentTypeException('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
}
if ( ! is_bool($options['useCursor'])) {
throw new InvalidArgumentTypeException('"useCursor" option', $options['useCursor'], 'boolean');
}
if (isset($options['batchSize']) && ! $options['useCursor']) {
throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false');
}
$expectedIndex = 0;
foreach ($pipeline as $i => $op) {
if ($i !== $expectedIndex) {
throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
}
if ( ! is_array($op) && ! is_object($op)) {
throw new InvalidArgumentTypeException(sprintf('$pipeline[%d]', $i), $op, 'array or object');
}
$expectedIndex += 1;
}
$this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName;
$this->pipeline = $pipeline;
$this->options = $options;
}
/**
* Execute the operation.
*
* @see Executable::execute()
* @param Server $server
* @return Traversable
*/
public function execute(Server $server)
{
$command = $this->createCommand($server);
$cursor = $server->executeCommand($this->databaseName, $command);
if ($this->options['useCursor']) {
return $cursor;
}
$result = current($cursor->toArray());
if (empty($result['ok'])) {
throw new RuntimeException(isset($result['errmsg']) ? $result['errmsg'] : 'Unknown error');
}
if ( ! isset($result['result']) || ! is_array($result['result'])) {
throw new UnexpectedValueException('aggregate command did not return a "result" array');
}
return new ArrayIterator(array_map(
function (stdClass $document) { return (array) $document; },
$result['result']
));
}
/**
* Create the aggregate command.
*
* @param Server $server
* @return Command
*/
private function createCommand(Server $server)
{
$cmd = array(
'aggregate' => $this->collectionName,
'pipeline' => $this->pipeline,
);
// Servers < 2.6 do not support any command options
if ( ! \MongoDB\server_supports_feature($server, self::$wireVersionForCursor)) {
return new Command($cmd);
}
$cmd['allowDiskUse'] = $this->options['allowDiskUse'];
if (isset($this->options['maxTimeMS'])) {
$cmd['maxTimeMS'] = $this->options['maxTimeMS'];
}
if ($this->options['useCursor']) {
$cmd['cursor'] = isset($this->options["batchSize"])
? array('batchSize' => $this->options["batchSize"])
: new stdClass;
}
return new Command($cmd);
}
}
......@@ -3,7 +3,6 @@
namespace MongoDB\Tests\Collection\CrudSpec;
use MongoDB\Collection;
use MongoDB\FeatureDetection;
use MongoDB\Driver\ReadPreference;
/**
......@@ -13,6 +12,8 @@ use MongoDB\Driver\ReadPreference;
*/
class AggregateFunctionalTest extends FunctionalTestCase
{
private static $wireVersionForOutOperator = 2;
public function setUp()
{
parent::setUp();
......@@ -43,7 +44,7 @@ class AggregateFunctionalTest extends FunctionalTestCase
{
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
if ( ! FeatureDetection::isSupported($server, FeatureDetection::API_AGGREGATE_CURSOR)) {
if ( ! \MongoDB\server_supports_feature($server, self::$wireVersionForOutOperator)) {
$this->markTestSkipped('$out aggregation pipeline operator is not supported');
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment