MapReduce.php 18.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
<?php
/*
 * Copyright 2015-2017 MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace MongoDB\Operation;

20
use ArrayIterator;
21
use MongoDB\BSON\JavascriptInterface;
22
use MongoDB\Driver\Command;
23
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
24 25 26
use MongoDB\Driver\ReadConcern;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
27
use MongoDB\Driver\Session;
28 29 30 31 32 33
use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\UnexpectedValueException;
use MongoDB\Exception\UnsupportedException;
use MongoDB\MapReduceResult;
use stdClass;
34 35 36 37 38 39 40 41 42
use function current;
use function is_array;
use function is_bool;
use function is_integer;
use function is_object;
use function is_string;
use function MongoDB\create_field_path_type_map;
use function MongoDB\is_mapreduce_output_inline;
use function MongoDB\server_supports_feature;
43 44
use function trigger_error;
use const E_USER_DEPRECATED;
45 46 47 48 49 50 51 52 53 54

/**
 * Operation for the mapReduce command.
 *
 * @api
 * @see \MongoDB\Collection::mapReduce()
 * @see https://docs.mongodb.com/manual/reference/command/mapReduce/
 */
class MapReduce implements Executable
{
55
    /** @var integer */
56
    private static $wireVersionForCollation = 5;
57 58

    /** @var integer */
59
    private static $wireVersionForDocumentLevelValidation = 4;
60 61

    /** @var integer */
62
    private static $wireVersionForReadConcern = 4;
63 64

    /** @var integer */
65 66
    private static $wireVersionForWriteConcern = 4;

67
    /** @var string */
68
    private $databaseName;
69 70

    /** @var string */
71
    private $collectionName;
72 73

    /** @var JavascriptInterface */
74
    private $map;
75 76

    /** @var JavascriptInterface */
77
    private $reduce;
78 79

    /** @var array|object|string */
80
    private $out;
81 82

    /** @var array */
83 84 85 86 87 88 89 90 91 92
    private $options;

    /**
     * Constructs a mapReduce command.
     *
     * Required arguments:
     *
     *  * map (MongoDB\BSON\Javascript): A JavaScript function that associates
     *    or "maps" a value with a key and emits the key and value pair.
     *
93 94 95
     *    Passing a Javascript instance with a scope is deprecated. Put all
     *    scope variables in the "scope" option of the MapReduce operation.
     *
96 97 98
     *  * reduce (MongoDB\BSON\Javascript): A JavaScript function that "reduces"
     *    to a single object all the values associated with a particular key.
     *
99 100 101
     *    Passing a Javascript instance with a scope is deprecated. Put all
     *    scope variables in the "scope" option of the MapReduce operation.
     *
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
     *  * out (string|document): Specifies where to output the result of the
     *    map-reduce operation. You can either output to a collection or return
     *    the result inline. On a primary member of a replica set you can output
     *    either to a collection or inline, but on a secondary, only inline
     *    output is possible.
     *
     * Supported options:
     *
     *  * bypassDocumentValidation (boolean): If true, allows the write to
     *    circumvent document level validation. This only applies when results
     *    are output to a collection.
     *
     *    For servers < 3.2, this option is ignored as document level validation
     *    is not available.
     *
     *  * collation (document): Collation specification.
     *
     *    This is not supported for server versions < 3.4 and will result in an
     *    exception at execution time if used.
     *
122 123
     *  * finalize (MongoDB\BSON\JavascriptInterface): Follows the reduce method
     *    and modifies the output.
124
     *
125 126 127
     *    Passing a Javascript instance with a scope is deprecated. Put all
     *    scope variables in the "scope" option of the MapReduce operation.
     *
128
     *  * jsMode (boolean): Specifies whether to convert intermediate data into
129
     *    BSON format between the execution of the map and reduce functions.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
     *
     *  * limit (integer): Specifies a maximum number of documents for the input
     *    into the map function.
     *
     *  * maxTimeMS (integer): The maximum amount of time to allow the query to
     *    run.
     *
     *  * query (document): Specifies the selection criteria using query
     *    operators for determining the documents input to the map function.
     *
     *  * readConcern (MongoDB\Driver\ReadConcern): Read concern. This is not
     *    supported when results are returned inline.
     *
     *    This is not supported for server versions < 3.2 and will result in an
     *    exception at execution time if used.
     *
     *  * readPreference (MongoDB\Driver\ReadPreference): Read preference.
     *
148 149
     *    This option is ignored if results are output to a collection.
     *
150 151 152
     *  * scope (document): Specifies global variables that are accessible in
     *    the map, reduce and finalize functions.
     *
153 154 155 156
     *  * session (MongoDB\Driver\Session): Client session.
     *
     *    Sessions are not supported for server versions < 3.6.
     *
157 158 159 160 161 162 163 164 165
     *  * sort (document): Sorts the input documents. This option is useful for
     *    optimization. For example, specify the sort key to be the same as the
     *    emit key so that there are fewer reduce operations. The sort key must
     *    be in an existing index for this collection.
     *
     *  * typeMap (array): Type map for BSON deserialization. This will be
     *    applied to the returned Cursor (it is not sent to the server).
     *
     *  * verbose (boolean): Specifies whether to include the timing information
166
     *    in the result information.
167 168 169 170 171 172 173 174 175
     *
     *  * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only
     *    applies when results are output to a collection.
     *
     *    This is not supported for server versions < 3.4 and will result in an
     *    exception at execution time if used.
     *
     * @param string              $databaseName   Database name
     * @param string              $collectionName Collection name
176 177
     * @param JavascriptInterface $map            Map function
     * @param JavascriptInterface $reduce         Reduce function
178 179 180 181
     * @param string|array|object $out            Output specification
     * @param array               $options        Command options
     * @throws InvalidArgumentException for parameter/option parsing errors
     */
182
    public function __construct($databaseName, $collectionName, JavascriptInterface $map, JavascriptInterface $reduce, $out, array $options = [])
183
    {
184
        if (! is_string($out) && ! is_array($out) && ! is_object($out)) {
185 186 187 188 189 190 191 192 193 194 195
            throw InvalidArgumentException::invalidType('$out', $out, 'string or array or object');
        }

        if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) {
            throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean');
        }

        if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) {
            throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object');
        }

196
        if (isset($options['finalize']) && ! $options['finalize'] instanceof JavascriptInterface) {
197
            throw InvalidArgumentException::invalidType('"finalize" option', $options['finalize'], JavascriptInterface::class);
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
        }

        if (isset($options['jsMode']) && ! is_bool($options['jsMode'])) {
            throw InvalidArgumentException::invalidType('"jsMode" option', $options['jsMode'], 'boolean');
        }

        if (isset($options['limit']) && ! is_integer($options['limit'])) {
            throw InvalidArgumentException::invalidType('"limit" option', $options['limit'], 'integer');
        }

        if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
            throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
        }

        if (isset($options['query']) && ! is_array($options['query']) && ! is_object($options['query'])) {
            throw InvalidArgumentException::invalidType('"query" option', $options['query'], 'array or object');
        }

        if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) {
217
            throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class);
218 219 220
        }

        if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
221
            throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
222 223 224 225 226 227
        }

        if (isset($options['scope']) && ! is_array($options['scope']) && ! is_object($options['scope'])) {
            throw InvalidArgumentException::invalidType('"scope" option', $options['scope'], 'array or object');
        }

228
        if (isset($options['session']) && ! $options['session'] instanceof Session) {
229
            throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class);
230 231
        }

232 233 234 235 236 237 238 239 240 241 242 243 244
        if (isset($options['sort']) && ! is_array($options['sort']) && ! is_object($options['sort'])) {
            throw InvalidArgumentException::invalidType('"sort" option', $options['sort'], 'array or object');
        }

        if (isset($options['typeMap']) && ! is_array($options['typeMap'])) {
            throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array');
        }

        if (isset($options['verbose']) && ! is_bool($options['verbose'])) {
            throw InvalidArgumentException::invalidType('"verbose" option', $options['verbose'], 'boolean');
        }

        if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) {
245
            throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class);
246 247
        }

248 249 250 251 252 253 254 255
        if (isset($options['readConcern']) && $options['readConcern']->isDefault()) {
            unset($options['readConcern']);
        }

        if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) {
            unset($options['writeConcern']);
        }

256 257 258 259 260 261 262 263 264 265 266 267 268
        // Handle deprecation of CodeWScope
        if ($map->getScope() !== null) {
            @trigger_error('Use of Javascript with scope in "$map" argument for MapReduce is deprecated. Put all scope variables in the "scope" option of the MapReduce operation.', E_USER_DEPRECATED);
        }

        if ($reduce->getScope() !== null) {
            @trigger_error('Use of Javascript with scope in "$reduce" argument for MapReduce is deprecated. Put all scope variables in the "scope" option of the MapReduce operation.', E_USER_DEPRECATED);
        }

        if (isset($options['finalize']) && $options['finalize']->getScope() !== null) {
            @trigger_error('Use of Javascript with scope in "finalize" option for MapReduce is deprecated. Put all scope variables in the "scope" option of the MapReduce operation.', E_USER_DEPRECATED);
        }

269 270
        $this->checkOutDeprecations($out);

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
        $this->databaseName = (string) $databaseName;
        $this->collectionName = (string) $collectionName;
        $this->map = $map;
        $this->reduce = $reduce;
        $this->out = $out;
        $this->options = $options;
    }

    /**
     * Execute the operation.
     *
     * @see Executable::execute()
     * @param Server $server
     * @return MapReduceResult
     * @throws UnexpectedValueException if the command response was malformed
     * @throws UnsupportedException if collation, read concern, or write concern is used and unsupported
     * @throws DriverRuntimeException for other driver errors (e.g. connection errors)
     */
    public function execute(Server $server)
    {
291
        if (isset($this->options['collation']) && ! server_supports_feature($server, self::$wireVersionForCollation)) {
292 293 294
            throw UnsupportedException::collationNotSupported();
        }

295
        if (isset($this->options['readConcern']) && ! server_supports_feature($server, self::$wireVersionForReadConcern)) {
296 297 298
            throw UnsupportedException::readConcernNotSupported();
        }

299
        if (isset($this->options['writeConcern']) && ! server_supports_feature($server, self::$wireVersionForWriteConcern)) {
300 301 302
            throw UnsupportedException::writeConcernNotSupported();
        }

303 304 305 306 307 308 309 310 311 312
        $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction();
        if ($inTransaction) {
            if (isset($this->options['readConcern'])) {
                throw UnsupportedException::readConcernNotSupportedInTransaction();
            }
            if (isset($this->options['writeConcern'])) {
                throw UnsupportedException::writeConcernNotSupportedInTransaction();
            }
        }

313
        $hasOutputCollection = ! is_mapreduce_output_inline($this->out);
314 315 316 317

        $command = $this->createCommand($server);
        $options = $this->createOptions($hasOutputCollection);

318 319 320 321 322 323
        /* If the mapReduce operation results in a write, use
         * executeReadWriteCommand to ensure we're handling the writeConcern
         * option.
         * In other cases, we use executeCommand as this will prevent the
         * mapReduce operation from being retried when retryReads is enabled.
         * See https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.rst#unsupported-read-operations. */
324 325
        $cursor = $hasOutputCollection
            ? $server->executeReadWriteCommand($this->databaseName, $command, $options)
326
            : $server->executeCommand($this->databaseName, $command, $options);
327

328
        if (isset($this->options['typeMap']) && ! $hasOutputCollection) {
329
            $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'results.$'));
330 331
        }

332 333 334 335 336 337 338
        $result = current($cursor->toArray());

        $getIterator = $this->createGetIteratorCallable($result, $server);

        return new MapReduceResult($getIterator, $result);
    }

339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    /**
     * @param string|array|object $out
     * @return void
     */
    private function checkOutDeprecations($out)
    {
        if (is_string($out)) {
            return;
        }

        $out = (array) $out;

        if (isset($out['nonAtomic']) && ! $out['nonAtomic']) {
            @trigger_error('Specifying false for "out.nonAtomic" is deprecated.', E_USER_DEPRECATED);
        }

        if (isset($out['sharded']) && ! $out['sharded']) {
            @trigger_error('Specifying false for "out.sharded" is deprecated.', E_USER_DEPRECATED);
        }
    }

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
    /**
     * Create the mapReduce command.
     *
     * @param Server $server
     * @return Command
     */
    private function createCommand(Server $server)
    {
        $cmd = [
            'mapReduce' => $this->collectionName,
            'map' => $this->map,
            'reduce' => $this->reduce,
            'out' => $this->out,
        ];

375
        foreach (['finalize', 'jsMode', 'limit', 'maxTimeMS', 'verbose'] as $option) {
376 377 378 379 380 381 382 383 384 385 386
            if (isset($this->options[$option])) {
                $cmd[$option] = $this->options[$option];
            }
        }

        foreach (['collation', 'query', 'scope', 'sort'] as $option) {
            if (isset($this->options[$option])) {
                $cmd[$option] = (object) $this->options[$option];
            }
        }

387 388
        if (! empty($this->options['bypassDocumentValidation']) &&
            server_supports_feature($server, self::$wireVersionForDocumentLevelValidation)
389
        ) {
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
            $cmd['bypassDocumentValidation'] = $this->options['bypassDocumentValidation'];
        }

        return new Command($cmd);
    }

    /**
     * Creates a callable for MapReduceResult::getIterator().
     *
     * @param stdClass $result
     * @param Server   $server
     * @return callable
     * @throws UnexpectedValueException if the command response was malformed
     */
    private function createGetIteratorCallable(stdClass $result, Server $server)
    {
        // Inline results can be wrapped with an ArrayIterator
        if (isset($result->results) && is_array($result->results)) {
            $results = $result->results;

410
            return function () use ($results) {
411 412 413 414 415 416 417 418 419 420 421
                return new ArrayIterator($results);
            };
        }

        if (isset($result->result) && (is_string($result->result) || is_object($result->result))) {
            $options = isset($this->options['typeMap']) ? ['typeMap' => $this->options['typeMap']] : [];

            $find = is_string($result->result)
                ? new Find($this->databaseName, $result->result, [], $options)
                : new Find($result->result->db, $result->result->collection, [], $options);

422
            return function () use ($find, $server) {
423 424 425 426 427 428
                return $find->execute($server);
            };
        }

        throw new UnexpectedValueException('mapReduce command did not return inline results or an output collection');
    }
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445

    /**
     * Create options for executing the command.
     *
     * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
     * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
     * @param boolean $hasOutputCollection
     * @return array
     */
    private function createOptions($hasOutputCollection)
    {
        $options = [];

        if (isset($this->options['readConcern'])) {
            $options['readConcern'] = $this->options['readConcern'];
        }

446
        if (! $hasOutputCollection && isset($this->options['readPreference'])) {
447 448 449
            $options['readPreference'] = $this->options['readPreference'];
        }

450 451 452 453
        if (isset($this->options['session'])) {
            $options['session'] = $this->options['session'];
        }

454 455 456 457 458 459
        if ($hasOutputCollection && isset($this->options['writeConcern'])) {
            $options['writeConcern'] = $this->options['writeConcern'];
        }

        return $options;
    }
460
}