Watch.php 16.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
<?php
/*
 * Copyright 2017 MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace MongoDB\Operation;

20
use MongoDB\BSON\TimestampInterface;
21
use MongoDB\ChangeStream;
22
use MongoDB\Driver\Cursor;
23
use MongoDB\Driver\Exception\RuntimeException;
24
use MongoDB\Driver\Manager;
25 26
use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
27
use MongoDB\Driver\Monitoring\CommandSubscriber;
28
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
29 30
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
31 32 33
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\UnexpectedValueException;
use MongoDB\Exception\UnsupportedException;
34 35 36 37 38 39 40 41 42
use MongoDB\Model\ChangeStreamIterator;
use function array_intersect_key;
use function array_unshift;
use function count;
use function is_array;
use function is_object;
use function is_string;
use function MongoDB\Driver\Monitoring\addSubscriber;
use function MongoDB\Driver\Monitoring\removeSubscriber;
43
use function MongoDB\select_server;
44
use function MongoDB\server_supports_feature;
45 46 47 48

/**
 * Operation for creating a change stream with the aggregate command.
 *
49 50 51
 * Note: the implementation of CommandSubscriber is an internal implementation
 * detail and should not be considered part of the public API.
 *
52
 * @api
53 54
 * @see \MongoDB\Collection::watch()
 * @see https://docs.mongodb.com/manual/changeStreams/
55
 */
56
class Watch implements Executable, /* @internal */ CommandSubscriber
57 58 59 60
{
    const FULL_DOCUMENT_DEFAULT = 'default';
    const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';

61 62 63 64
    /** @var integer */
    private static $wireVersionForStartAtOperationTime = 7;

    /** @var Aggregate */
65
    private $aggregate;
66 67

    /** @var array */
68
    private $aggregateOptions;
69 70

    /** @var array */
71
    private $changeStreamOptions;
72 73

    /** @var string|null */
74
    private $collectionName;
75 76

    /** @var string */
77
    private $databaseName;
78 79

    /** @var integer|null */
80
    private $firstBatchSize;
81 82

    /** @var boolean */
83
    private $hasResumed = false;
84 85

    /** @var Manager */
86
    private $manager;
87 88

    /** @var TimestampInterface */
89
    private $operationTime;
90 91

    /** @var array */
92
    private $pipeline;
93 94

    /** @var object|null */
95
    private $postBatchResumeToken;
96 97

    /**
98
     * Constructs an aggregate command for creating a change stream.
99 100 101
     *
     * Supported options:
     *
102 103 104 105
     *  * batchSize (integer): The number of documents to return per batch.
     *
     *  * collation (document): Specifies a collation.
     *
106 107 108 109 110 111 112 113 114
     *  * fullDocument (string): Determines whether the "fullDocument" field
     *    will be populated for update operations. By default, change streams
     *    only return the delta of fields during the update operation (via the
     *    "updateDescription" field). To additionally return the most current
     *    majority-committed version of the updated document, specify
     *    "updateLookup" for this option. Defaults to "default".
     *
     *    Insert and replace operations always include the "fullDocument" field
     *    and delete operations omit the field as the document no longer exists.
115
     *
116 117
     *  * maxAwaitTimeMS (integer): The maximum amount of time for the server to
     *    wait on new documents to satisfy a change stream query.
118
     *
119
     *  * readConcern (MongoDB\Driver\ReadConcern): Read concern.
120
     *
121 122 123
     *  * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
     *    will be used to select a new server when resuming. Defaults to a
     *    "primary" read preference.
124
     *
125 126
     *  * resumeAfter (document): Specifies the logical starting point for the
     *    new change stream.
127
     *
128 129 130
     *    Using this option in conjunction with "startAfter" and/or
     *    "startAtOperationTime" will result in a server error. The options are
     *    mutually exclusive.
131
     *
132 133 134 135
     *  * session (MongoDB\Driver\Session): Client session.
     *
     *    Sessions are not supported for server versions < 3.6.
     *
136 137 138 139 140 141 142 143
     *  * startAfter (document): Specifies the logical starting point for the
     *    new change stream. Unlike "resumeAfter", this option can be used with
     *    a resume token from an "invalidate" event.
     *
     *    Using this option in conjunction with "resumeAfter" and/or
     *    "startAtOperationTime" will result in a server error. The options are
     *    mutually exclusive.
     *
144 145 146 147 148 149
     *  * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
     *    the change stream will only provide changes that occurred at or after
     *    the specified timestamp. Any command run against the server will
     *    return an operation time that can be used here. Alternatively, an
     *    operation time may be obtained from MongoDB\Driver\Server::getInfo().
     *
150 151 152
     *    Using this option in conjunction with "resumeAfter" and/or
     *    "startAfter" will result in a server error. The options are mutually
     *    exclusive.
153 154 155
     *
     *    This option is not supported for server versions < 4.0.
     *
156 157 158
     *  * typeMap (array): Type map for BSON deserialization. This will be
     *    applied to the returned Cursor (it is not sent to the server).
     *
159 160 161 162
     * Note: A database-level change stream may be created by specifying null
     * for the collection name. A cluster-level change stream may be created by
     * specifying null for both the database and collection name.
     *
163 164 165 166 167
     * @param Manager     $manager        Manager instance from the driver
     * @param string|null $databaseName   Database name
     * @param string|null $collectionName Collection name
     * @param array       $pipeline       List of pipeline operations
     * @param array       $options        Command options
168 169
     * @throws InvalidArgumentException for parameter/option parsing errors
     */
170
    public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
171
    {
172 173 174 175
        if (isset($collectionName) && ! isset($databaseName)) {
            throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
        }

176 177 178 179 180
        $options += [
            'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
            'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
        ];

181
        if (! is_string($options['fullDocument'])) {
182
            throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
183 184
        }

185 186 187 188
        if (! $options['readPreference'] instanceof ReadPreference) {
            throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
        }

189 190 191 192
        if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
            throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
        }

193 194 195 196
        if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
            throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
        }

197 198
        if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
            throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
199 200
        }

201 202
        /* In the absence of an explicit session, create one to ensure that the
         * initial aggregation and any resume attempts can use the same session
203 204 205
         * ("implicit from the user's perspective" per PHPLIB-342). Since this
         * is filling in for an implicit session, we default "causalConsistency"
         * to false. */
206
        if (! isset($options['session'])) {
207
            try {
208
                $options['session'] = $manager->startSession(['causalConsistency' => false]);
209 210 211 212
            } catch (RuntimeException $e) {
                /* We can ignore the exception, as libmongoc likely cannot
                 * create its own session and there is no risk of a mismatch. */
            }
213 214
        }

215
        $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
216
        $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
217 218 219 220 221 222 223

        // Null database name implies a cluster-wide change stream
        if ($databaseName === null) {
            $databaseName = 'admin';
            $this->changeStreamOptions['allChangesForCluster'] = true;
        }

224
        $this->manager = $manager;
225
        $this->databaseName = (string) $databaseName;
226
        $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
227
        $this->pipeline = $pipeline;
228 229

        $this->aggregate = $this->createAggregate();
230 231
    }

232 233 234 235 236 237 238 239
    /** @internal */
    final public function commandFailed(CommandFailedEvent $event)
    {
    }

    /** @internal */
    final public function commandStarted(CommandStartedEvent $event)
    {
240 241 242 243
        if ($event->getCommandName() !== 'aggregate') {
            return;
        }

244 245
        $this->firstBatchSize = null;
        $this->postBatchResumeToken = null;
246 247 248 249 250 251 252 253 254 255 256
    }

    /** @internal */
    final public function commandSucceeded(CommandSucceededEvent $event)
    {
        if ($event->getCommandName() !== 'aggregate') {
            return;
        }

        $reply = $event->getReply();

257
        if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
258 259 260 261 262 263 264
            throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
        }

        $this->firstBatchSize = count($reply->cursor->firstBatch);

        if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
            $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
265
        }
266

267 268 269
        if ($this->shouldCaptureOperationTime($event->getServer()) &&
            isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
            $this->operationTime = $reply->operationTime;
270
        }
271 272
    }

273 274 275 276 277
    /**
     * Execute the operation.
     *
     * @see Executable::execute()
     * @param Server $server
278
     * @return ChangeStream
279
     * @throws UnsupportedException if collation or read concern is used and unsupported
280
     * @throws RuntimeException for other driver errors (e.g. connection errors)
281 282 283
     */
    public function execute(Server $server)
    {
284 285
        return new ChangeStream(
            $this->createChangeStreamIterator($server),
286 287 288
            function ($resumeToken, $hasAdvanced) {
                return $this->resume($resumeToken, $hasAdvanced);
            }
289
        );
290 291 292
    }

    /**
293
     * Create the aggregate command for a change stream.
294
     *
295
     * This method is also used to recreate the aggregate command when resuming.
296 297
     *
     * @return Aggregate
298
     */
299
    private function createAggregate()
300
    {
301
        $pipeline = $this->pipeline;
302
        array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
303

304
        return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
305 306
    }

307 308 309 310 311 312 313
    /**
     * Create a ChangeStreamIterator by executing the aggregate command.
     *
     * @param Server $server
     * @return ChangeStreamIterator
     */
    private function createChangeStreamIterator(Server $server)
314
    {
315 316 317 318 319 320
        return new ChangeStreamIterator(
            $this->executeAggregate($server),
            $this->firstBatchSize,
            $this->getInitialResumeToken(),
            $this->postBatchResumeToken
        );
321
    }
322 323

    /**
324 325
     * Execute the aggregate command.
     *
326 327
     * The command will be executed using APM so that we can capture data from
     * its response (e.g. firstBatch size, postBatchResumeToken).
328 329 330 331 332 333
     *
     * @param Server $server
     * @return Cursor
     */
    private function executeAggregate(Server $server)
    {
334
        addSubscriber($this);
335 336 337 338

        try {
            return $this->aggregate->execute($server);
        } finally {
339
            removeSubscriber($this);
340 341
        }
    }
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370

    /**
     * Return the initial resume token for creating the ChangeStreamIterator.
     *
     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
     * @return array|object|null
     */
    private function getInitialResumeToken()
    {
        if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
            return $this->postBatchResumeToken;
        }

        if (isset($this->changeStreamOptions['startAfter'])) {
            return $this->changeStreamOptions['startAfter'];
        }

        if (isset($this->changeStreamOptions['resumeAfter'])) {
            return $this->changeStreamOptions['resumeAfter'];
        }

        return null;
    }

    /**
     * Resumes a change stream.
     *
     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
     * @param array|object|null $resumeToken
371
     * @param bool              $hasAdvanced
372 373 374
     * @return ChangeStreamIterator
     * @throws InvalidArgumentException
     */
375
    private function resume($resumeToken = null, $hasAdvanced = false)
376 377 378 379 380 381 382
    {
        if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
            throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
        }

        $this->hasResumed = true;

383 384 385 386
        /* Select a new server using the original read preference. While watch
         * is not usable within transactions, we still check if there is a
         * pinned session. This is to avoid an ambiguous error message about
         * running a command on the wrong server. */
387
        $server = select_server($this->manager, $this->aggregateOptions);
388

389
        $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
390

391 392 393 394 395
        unset($this->changeStreamOptions['resumeAfter']);
        unset($this->changeStreamOptions['startAfter']);
        unset($this->changeStreamOptions['startAtOperationTime']);

        if ($resumeToken !== null) {
396
            $this->changeStreamOptions[$resumeOption] = $resumeToken;
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
        }

        if ($resumeToken === null && $this->operationTime !== null) {
            $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
        }

        // Recreate the aggregate command and return a new ChangeStreamIterator
        $this->aggregate = $this->createAggregate();

        return $this->createChangeStreamIterator($server);
    }

    /**
     * Determine whether to capture operation time from an aggregate response.
     *
     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
     * @param Server $server
     * @return boolean
     */
    private function shouldCaptureOperationTime(Server $server)
    {
        if ($this->hasResumed) {
            return false;
        }

        if (isset($this->changeStreamOptions['resumeAfter']) ||
            isset($this->changeStreamOptions['startAfter']) ||
            isset($this->changeStreamOptions['startAtOperationTime'])) {
            return false;
        }

        if ($this->firstBatchSize > 0) {
            return false;
        }

        if ($this->postBatchResumeToken !== null) {
            return false;
        }

436
        if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
437 438 439 440 441
            return false;
        }

        return true;
    }
442
}