ChangeStreamIterator.php 9.23 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
<?php
/*
 * Copyright 2019 MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace MongoDB\Model;

20
use IteratorIterator;
21 22 23 24
use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
25
use MongoDB\Driver\Monitoring\CommandSubscriber;
26
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
27
use MongoDB\Driver\Server;
28 29 30
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Exception\UnexpectedValueException;
31 32 33 34 35 36
use function count;
use function is_array;
use function is_integer;
use function is_object;
use function MongoDB\Driver\Monitoring\addSubscriber;
use function MongoDB\Driver\Monitoring\removeSubscriber;
37 38 39 40 41 42 43 44 45 46 47 48

/**
 * ChangeStreamIterator wraps a change stream's tailable cursor.
 *
 * This iterator tracks the size of each batch in order to determine when the
 * postBatchResumeToken is applicable. It also ensures that initial calls to
 * rewind() do not execute getMore commands.
 *
 * @internal
 */
class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
{
49
    /** @var integer */
50
    private $batchPosition = 0;
51 52

    /** @var integer */
53
    private $batchSize;
54 55

    /** @var boolean */
56
    private $isRewindNop;
57

58 59 60
    /** @var boolean */
    private $isValid = false;

61
    /** @var object|null */
62
    private $postBatchResumeToken;
63 64

    /** @var array|object|null */
65 66
    private $resumeToken;

67 68 69
    /** @var Server */
    private $server;

70 71 72 73 74 75 76 77 78
    /**
     * @internal
     * @param Cursor            $cursor
     * @param integer           $firstBatchSize
     * @param array|object|null $initialResumeToken
     * @param object|null       $postBatchResumeToken
     */
    public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)
    {
79
        if (! is_integer($firstBatchSize)) {
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
            throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer');
        }

        if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
            throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
        }

        if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) {
            throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object');
        }

        parent::__construct($cursor);

        $this->batchSize = $firstBatchSize;
        $this->isRewindNop = ($firstBatchSize === 0);
        $this->postBatchResumeToken = $postBatchResumeToken;
        $this->resumeToken = $initialResumeToken;
97
        $this->server = $cursor->getServer();
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    }

    /** @internal */
    final public function commandFailed(CommandFailedEvent $event)
    {
    }

    /** @internal */
    final public function commandStarted(CommandStartedEvent $event)
    {
        if ($event->getCommandName() !== 'getMore') {
            return;
        }

        $this->batchPosition = 0;
        $this->batchSize = null;
        $this->postBatchResumeToken = null;
    }

    /** @internal */
    final public function commandSucceeded(CommandSucceededEvent $event)
    {
        if ($event->getCommandName() !== 'getMore') {
            return;
        }

        $reply = $event->getReply();

126
        if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
127 128 129 130 131 132 133 134 135 136
            throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
        }

        $this->batchSize = count($reply->cursor->nextBatch);

        if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
            $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
        }
    }

137 138 139 140 141 142 143 144 145
    /**
     * @see https://php.net/iteratoriterator.current
     * @return mixed
     */
    public function current()
    {
        return $this->isValid ? parent::current() : null;
    }

146 147 148 149 150 151 152 153 154 155 156 157 158 159
    /**
     * Returns the resume token for the iterator's current position.
     *
     * Null may be returned if no change documents have been iterated and the
     * server did not include a postBatchResumeToken in its aggregate or getMore
     * command response.
     *
     * @return array|object|null
     */
    public function getResumeToken()
    {
        return $this->resumeToken;
    }

160 161 162 163 164 165 166 167
    /**
     * Returns the server the cursor is running on.
     */
    public function getServer() : Server
    {
        return $this->server;
    }

168 169 170 171 172 173 174 175 176
    /**
     * @see https://php.net/iteratoriterator.key
     * @return mixed
     */
    public function key()
    {
        return $this->isValid ? parent::key() : null;
    }

177 178 179 180 181 182 183 184 185 186 187 188 189 190
    /**
     * @see https://php.net/iteratoriterator.rewind
     * @return void
     */
    public function next()
    {
        /* Determine if advancing the iterator will execute a getMore command
         * (i.e. we are already positioned at the end of the current batch). If
         * so, rely on the APM callbacks to reset $batchPosition and update
         * $batchSize. Otherwise, we can forgo APM and manually increment
         * $batchPosition after calling next(). */
        $getMore = $this->isAtEndOfBatch();

        if ($getMore) {
191
            addSubscriber($this);
192 193 194 195
        }

        try {
            parent::next();
196
            $this->onIteration(! $getMore);
197 198
        } finally {
            if ($getMore) {
199
                removeSubscriber($this);
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
            }
        }
    }

    /**
     * @see https://php.net/iteratoriterator.rewind
     * @return void
     */
    public function rewind()
    {
        if ($this->isRewindNop) {
            return;
        }

        parent::rewind();
        $this->onIteration(false);
    }

218 219 220 221 222 223 224 225 226
    /**
     * @see https://php.net/iteratoriterator.valid
     * @return boolean
     */
    public function valid()
    {
        return $this->isValid;
    }

227 228 229 230 231 232 233 234 235 236
    /**
     * Extracts the resume token (i.e. "_id" field) from a change document.
     *
     * @param array|object $document Change document
     * @return array|object
     * @throws InvalidArgumentException
     * @throws ResumeTokenException if the resume token is not found or invalid
     */
    private function extractResumeToken($document)
    {
237
        if (! is_array($document) && ! is_object($document)) {
238 239 240 241 242 243 244 245
            throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
        }

        if ($document instanceof Serializable) {
            return $this->extractResumeToken($document->bsonSerialize());
        }

        $resumeToken = is_array($document)
246 247
            ? ($document['_id'] ?? null)
            : ($document->_id ?? null);
248

249
        if (! isset($resumeToken)) {
250
            $this->isValid = false;
251 252 253
            throw ResumeTokenException::notFound();
        }

254
        if (! is_array($resumeToken) && ! is_object($resumeToken)) {
255
            $this->isValid = false;
256 257 258 259 260 261 262 263 264 265 266 267 268
            throw ResumeTokenException::invalidType($resumeToken);
        }

        return $resumeToken;
    }

    /**
     * Return whether the iterator is positioned at the end of the batch.
     *
     * @return boolean
     */
    private function isAtEndOfBatch()
    {
269
        return $this->batchPosition + 1 >= $this->batchSize;
270 271 272 273 274 275 276 277 278 279
    }

    /**
     * Perform housekeeping after an iteration event.
     *
     * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
     * @param boolean $incrementBatchPosition
     */
    private function onIteration($incrementBatchPosition)
    {
280
        $this->isValid = parent::valid();
281 282 283 284

        /* Disable rewind()'s NOP behavior once we advance to a valid position.
         * This will allow the driver to throw a LogicException if rewind() is
         * called after the cursor has advanced past its first element. */
285
        if ($this->isRewindNop && $this->isValid) {
286 287 288
            $this->isRewindNop = false;
        }

289
        if ($incrementBatchPosition && $this->isValid) {
290 291 292 293 294 295 296 297 298 299 300
            $this->batchPosition++;
        }

        /* If the iterator is positioned at the end of the batch, apply the
         * postBatchResumeToken if it's available. This handles both the case
         * where the current batch is empty (since onIteration() will be called
         * after a successful getMore) and when the iterator has advanced to the
         * last document in its current batch. Otherwise, extract a resume token
         * from the current document if possible. */
        if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
            $this->resumeToken = $this->postBatchResumeToken;
301
        } elseif ($this->isValid) {
302 303 304 305
            $this->resumeToken = $this->extractResumeToken($this->current());
        }
    }
}