1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
<?php
namespace MongoDB\Operation;
use MongoDB\Driver\Command;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
use MongoDB\Exception\RuntimeException;
use MongoDB\Exception\UnexpectedValueException;
use ArrayIterator;
use stdClass;
use Traversable;
/**
* Operation for the aggregate command.
*
* @api
* @see MongoDB\Collection::aggregate()
* @see http://docs.mongodb.org/manual/reference/command/aggregate/
*/
class Aggregate implements Executable
{
private static $wireVersionForCursor = 2;
private $databaseName;
private $collectionName;
private $pipeline;
private $options;
/**
* Constructs an aggregate command.
*
* Supported options:
*
* * allowDiskUse (boolean): Enables writing to temporary files. When set
* to true, aggregation stages can write data to the _tmp sub-directory
* in the dbPath directory. The default is false.
*
* * batchSize (integer): The number of documents to return per batch.
*
* * maxTimeMS (integer): The maximum amount of time to allow the query to
* run.
*
* * useCursor (boolean): Indicates whether the command will request that
* the server provide results using a cursor. The default is true.
*
* For servers < 2.6, this option is ignored as aggregation cursors are
* not available.
*
* For servers >= 2.6, this option allows users to turn off cursors if
* necessary to aid in mongod/mongos upgrades.
*
* @param string $databaseName Database name
* @param string $collectionName Collection name
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @throws InvalidArgumentException
*/
public function __construct($databaseName, $collectionName, array $pipeline, array $options = array())
{
$options += array(
'allowDiskUse' => false,
'useCursor' => true,
);
if ( ! is_bool($options['allowDiskUse'])) {
throw new InvalidArgumentTypeException('"allowDiskUse" option', $options['allowDiskUse'], 'boolean');
}
if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
throw new InvalidArgumentTypeException('"batchSize" option', $options['batchSize'], 'integer');
}
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
throw new InvalidArgumentTypeException('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
}
if ( ! is_bool($options['useCursor'])) {
throw new InvalidArgumentTypeException('"useCursor" option', $options['useCursor'], 'boolean');
}
if (isset($options['batchSize']) && ! $options['useCursor']) {
throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false');
}
$expectedIndex = 0;
foreach ($pipeline as $i => $op) {
if ($i !== $expectedIndex) {
throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
}
if ( ! is_array($op) && ! is_object($op)) {
throw new InvalidArgumentTypeException(sprintf('$pipeline[%d]', $i), $op, 'array or object');
}
$expectedIndex += 1;
}
$this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName;
$this->pipeline = $pipeline;
$this->options = $options;
}
/**
* Execute the operation.
*
* @see Executable::execute()
* @param Server $server
* @return Traversable
*/
public function execute(Server $server)
{
$isCursorSupported = \MongoDB\server_supports_feature($server, self::$wireVersionForCursor);
$command = $this->createCommand($server, $isCursorSupported);
$cursor = $server->executeCommand($this->databaseName, $command);
if ($isCursorSupported && $this->options['useCursor']) {
return $cursor;
}
$cursor->setTypeMap(array('document' => 'stdClass'));
$result = current($cursor->toArray());
// TODO: Remove this once PHPC-318 is implemented
is_array($result) and $result = (object) $result;
if (empty($result->ok)) {
throw new RuntimeException(isset($result->errmsg) ? $result->errmsg : 'Unknown error');
}
if ( ! isset($result->result) || ! is_array($result->result)) {
throw new UnexpectedValueException('aggregate command did not return a "result" array');
}
return new ArrayIterator($result->result);
}
/**
* Create the aggregate command.
*
* @param Server $server
* @param boolean $isCursorSupported
* @return Command
*/
private function createCommand(Server $server, $isCursorSupported)
{
$cmd = array(
'aggregate' => $this->collectionName,
'pipeline' => $this->pipeline,
);
// Servers < 2.6 do not support any command options
if ( ! $isCursorSupported) {
return new Command($cmd);
}
$cmd['allowDiskUse'] = $this->options['allowDiskUse'];
if (isset($this->options['maxTimeMS'])) {
$cmd['maxTimeMS'] = $this->options['maxTimeMS'];
}
if ($this->options['useCursor']) {
$cmd['cursor'] = isset($this->options["batchSize"])
? array('batchSize' => $this->options["batchSize"])
: new stdClass;
}
return new Command($cmd);
}
}