WatchFunctionalTest.php 66.6 KB
Newer Older
1 2 3 4
<?php

namespace MongoDB\Tests\Operation;

5
use Closure;
6
use Iterator;
7
use MongoDB\BSON\TimestampInterface;
8
use MongoDB\ChangeStream;
9
use MongoDB\Driver\Cursor;
10 11
use MongoDB\Driver\Exception\CommandException;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
12 13
use MongoDB\Driver\Exception\LogicException;
use MongoDB\Driver\Exception\ServerException;
14
use MongoDB\Driver\Manager;
15
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
16
use MongoDB\Driver\ReadPreference;
17
use MongoDB\Driver\WriteConcern;
18
use MongoDB\Exception\ResumeTokenException;
19
use MongoDB\Operation\DatabaseCommand;
20 21
use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch;
22
use MongoDB\Tests\CommandObserver;
23
use PHPUnit\Framework\ExpectationFailedException;
24
use ReflectionClass;
25
use stdClass;
26
use Symfony\Bridge\PhpUnit\SetUpTearDownTrait;
27 28 29 30 31 32 33
use function array_diff_key;
use function array_map;
use function bin2hex;
use function microtime;
use function MongoDB\server_supports_feature;
use function sprintf;
use function version_compare;
34

35
class WatchFunctionalTest extends FunctionalTestCase
36
{
37 38
    use SetUpTearDownTrait;

39
    const INTERRUPTED = 11601;
40 41
    const NOT_MASTER = 10107;

42
    /** @var integer */
43 44
    private static $wireVersionForStartAtOperationTime = 7;

45
    /** @var array */
46
    private $defaultOptions = ['maxAwaitTimeMS' => 500];
47

48
    private function doSetUp()
49 50
    {
        parent::setUp();
51

52
        $this->skipIfChangeStreamIsNotSupported();
53
        $this->createCollection();
Jeremy Mikola's avatar
Jeremy Mikola committed
54
    }
55

56
    /**
57
     * Prose test 1: "ChangeStream must continuously track the last seen
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
     * resumeToken"
     */
    public function testGetResumeToken()
    {
        if ($this->isPostBatchResumeTokenSupported()) {
            $this->markTestSkipped('postBatchResumeToken is supported');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $changeStream->rewind();
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->getResumeToken());

        $this->insertDocument(['x' => 1]);
        $this->insertDocument(['x' => 2]);

76
        $this->advanceCursorUntilValid($changeStream);
77 78 79
        $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());

        $changeStream->next();
80
        $this->assertTrue($changeStream->valid());
81 82 83 84
        $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());

        $this->insertDocument(['x' => 3]);

85
        $this->advanceCursorUntilValid($changeStream);
86 87 88 89
        $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
    }

    /**
90
     * Prose test 1: "ChangeStream must continuously track the last seen
91
     * resumeToken"
92 93 94 95 96 97 98 99 100 101 102 103 104 105
     *
     * Prose test 11:
     * For a ChangeStream under these conditions:
     *  - Running against a server >=4.0.7.
     *  - The batch is empty or has been iterated to the last document.
     * Expected result: getResumeToken must return the postBatchResumeToken from
     * the current command response.
     *
     * Prose test 13:
     * For a ChangeStream under these conditions:
     *  - The batch is not empty.
     *  - The batch has been iterated up to but not including the last element.
     * Expected result: getResumeToken must return the _id of the previous
     * document returned.
106 107 108
     */
    public function testGetResumeTokenWithPostBatchResumeToken()
    {
109
        if (! $this->isPostBatchResumeTokenSupported()) {
110 111 112 113 114 115 116
            $this->markTestSkipped('postBatchResumeToken is not supported');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $events = [];

117 118
        (new CommandObserver())->observe(
            function () use ($operation, &$changeStream) {
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
                $changeStream = $operation->execute($this->getPrimaryServer());
            },
            function (array $event) use (&$events) {
                $events[] = $event;
            }
        );

        $this->assertCount(1, $events);
        $this->assertSame('aggregate', $events[0]['started']->getCommandName());
        $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());

        $changeStream->rewind();
        $this->assertFalse($changeStream->valid());
        $this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());

        $this->insertDocument(['x' => 1]);
        $this->insertDocument(['x' => 2]);

137
        $lastEvent = null;
138

139 140
        (new CommandObserver())->observe(
            function () use ($changeStream) {
141
                $this->advanceCursorUntilValid($changeStream);
142
            },
143 144
            function (array $event) use (&$lastEvent) {
                $lastEvent = $event;
145 146 147
            }
        );

148 149 150
        $this->assertNotNull($lastEvent);
        $this->assertSame('getMore', $lastEvent['started']->getCommandName());
        $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($lastEvent['succeeded']->getReply());
151 152 153 154 155 156 157 158

        $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());

        $changeStream->next();
        $this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
    }

    /**
159
     * Prose test 10: "ChangeStream will resume after a killCursors command is
160 161
     * issued for its child cursor."
     */
162
    public function testNextResumesAfterCursorNotFound()
163
    {
164
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
165
        $changeStream = $operation->execute($this->getPrimaryServer());
166

167
        $changeStream->rewind();
168
        $this->assertFalse($changeStream->valid());
169

170
        $this->insertDocument(['_id' => 1, 'x' => 'foo']);
171

172
        $this->advanceCursorUntilValid($changeStream);
173 174 175 176

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
177
            'fullDocument' => ['_id' => 1, 'x' => 'foo'],
178
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
179
            'documentKey' => ['_id' => 1],
180 181
        ];

182
        $this->assertMatchesDocument($expectedResult, $changeStream->current());
183

184
        $this->killChangeStreamCursor($changeStream);
185

186
        $this->insertDocument(['_id' => 2, 'x' => 'bar']);
187

188
        $this->advanceCursorUntilValid($changeStream);
189 190 191 192

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
193
            'fullDocument' => ['_id' => 2, 'x' => 'bar'],
194
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
195
            'documentKey' => ['_id' => 2],
196 197
        ];

198
        $this->assertMatchesDocument($expectedResult, $changeStream->current());
Jeremy Mikola's avatar
Jeremy Mikola committed
199
    }
200

201 202 203 204 205
    public function testNextResumesAfterConnectionException()
    {
        /* In order to trigger a dropped connection, we'll use a new client with
         * a socket timeout that is less than the change stream's maxAwaitTimeMS
         * option. */
206
        $manager = new Manager(static::getUri(), ['socketTimeoutMS' => 50]);
207 208
        $primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));

209
        $operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
210
        $changeStream = $operation->execute($primaryServer);
211
        $changeStream->rewind();
212 213 214

        $commands = [];

215 216
        (new CommandObserver())->observe(
            function () use ($changeStream) {
217 218
                $changeStream->next();
            },
219
            function (array $event) use (&$commands) {
220 221 222
                $commands[] = $event['started']->getCommandName();
            }
        );
223 224 225 226 227 228

        $expectedCommands = [
            /* The initial aggregate command for change streams returns a cursor
             * envelope with an empty initial batch, since there are no changes
             * to report at the moment the change stream is created. Therefore,
             * we expect a getMore to be issued when we first advance the change
229
             * stream with next(). */
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
            'getMore',
            /* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
             * getMore command encounters a client socket timeout and leaves the
             * cursor open on the server. ChangeStream should catch this error
             * and resume by issuing a new aggregate command. */
            'aggregate',
            /* When ChangeStream resumes, it overwrites its original cursor with
             * the new cursor resulting from the last aggregate command. This
             * removes the last reference to the old cursor, which causes the
             * driver to kill it (via mongoc_cursor_destroy()). */
            'killCursors',
        ];

        $this->assertSame($expectedCommands, $commands);
    }

246 247
    public function testResumeBeforeReceivingAnyResultsIncludesPostBatchResumeToken()
    {
248
        if (! $this->isPostBatchResumeTokenSupported()) {
249 250 251 252 253 254 255
            $this->markTestSkipped('postBatchResumeToken is not supported');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $events = [];

256 257
        (new CommandObserver())->observe(
            function () use ($operation, &$changeStream) {
258 259 260 261 262 263 264 265 266
                $changeStream = $operation->execute($this->getPrimaryServer());
            },
            function (array $event) use (&$events) {
                $events[] = $event;
            }
        );

        $this->assertCount(1, $events);
        $this->assertSame('aggregate', $events[0]['started']->getCommandName());
267
        $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
268 269 270 271

        $this->assertFalse($changeStream->valid());
        $this->killChangeStreamCursor($changeStream);

272 273 274
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
275 276 277

        $events = [];

278 279
        (new CommandObserver())->observe(
            function () use ($changeStream) {
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
                $changeStream->next();
            },
            function (array $event) use (&$events) {
                $events[] = $event;
            }
        );

        $this->assertCount(3, $events);

        $this->assertSame('getMore', $events[0]['started']->getCommandName());
        $this->arrayHasKey('failed', $events[0]);

        $this->assertSame('aggregate', $events[1]['started']->getCommandName());
        $this->assertResumeAfter($postBatchResumeToken, $events[1]['started']->getCommand());
        $this->arrayHasKey('succeeded', $events[1]);

        // Original cursor is freed immediately after the change stream resumes
        $this->assertSame('killCursors', $events[2]['started']->getCommandName());
        $this->arrayHasKey('succeeded', $events[2]);

        $this->assertFalse($changeStream->valid());
    }

    private function assertResumeAfter($expectedResumeToken, stdClass $command)
    {
        $this->assertObjectHasAttribute('pipeline', $command);
306
        $this->assertIsArray($command->pipeline);
307 308 309 310 311 312
        $this->assertArrayHasKey(0, $command->pipeline);
        $this->assertObjectHasAttribute('$changeStream', $command->pipeline[0]);
        $this->assertObjectHasAttribute('resumeAfter', $command->pipeline[0]->{'$changeStream'});
        $this->assertEquals($expectedResumeToken, $command->pipeline[0]->{'$changeStream'}->resumeAfter);
    }

313
    /**
314 315
     * Prose test 9: "$changeStream stage for ChangeStream against a server
     * >=4.0 and <4.0.7 that has not received any results yet MUST include a
316 317
     * startAtOperationTime option when resuming a changestream."
     */
318 319
    public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
    {
320
        if (! $this->isStartAtOperationTimeSupported()) {
321 322 323 324 325 326
            $this->markTestSkipped('startAtOperationTime is not supported');
        }

        if ($this->isPostBatchResumeTokenSupported()) {
            $this->markTestSkipped('postBatchResumeToken takes precedence over startAtOperationTime');
        }
327

328 329 330 331
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $events = [];

332 333
        (new CommandObserver())->observe(
            function () use ($operation, &$changeStream) {
334 335 336 337 338 339 340 341 342
                $changeStream = $operation->execute($this->getPrimaryServer());
            },
            function (array $event) use (&$events) {
                $events[] = $event;
            }
        );

        $this->assertCount(1, $events);
        $this->assertSame('aggregate', $events[0]['started']->getCommandName());
343 344 345
        $reply = $events[0]['succeeded']->getReply();
        $this->assertObjectHasAttribute('operationTime', $reply);
        $operationTime = $reply->operationTime;
346 347
        $this->assertInstanceOf(TimestampInterface::class, $operationTime);

348
        $this->assertFalse($changeStream->valid());
349 350
        $this->killChangeStreamCursor($changeStream);

351 352 353
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
354 355 356

        $events = [];

357 358
        (new CommandObserver())->observe(
            function () use ($changeStream) {
359 360 361 362 363 364 365
                $changeStream->next();
            },
            function (array $event) use (&$events) {
                $events[] = $event;
            }
        );

366
        $this->assertCount(3, $events);
367 368 369 370 371 372 373 374 375 376 377 378

        $this->assertSame('getMore', $events[0]['started']->getCommandName());
        $this->arrayHasKey('failed', $events[0]);

        $this->assertSame('aggregate', $events[1]['started']->getCommandName());
        $this->assertStartAtOperationTime($operationTime, $events[1]['started']->getCommand());
        $this->arrayHasKey('succeeded', $events[1]);

        // Original cursor is freed immediately after the change stream resumes
        $this->assertSame('killCursors', $events[2]['started']->getCommandName());
        $this->arrayHasKey('succeeded', $events[2]);

379
        $this->assertFalse($changeStream->valid());
380 381 382 383 384
    }

    private function assertStartAtOperationTime(TimestampInterface $expectedOperationTime, stdClass $command)
    {
        $this->assertObjectHasAttribute('pipeline', $command);
385
        $this->assertIsArray($command->pipeline);
386 387 388 389 390 391
        $this->assertArrayHasKey(0, $command->pipeline);
        $this->assertObjectHasAttribute('$changeStream', $command->pipeline[0]);
        $this->assertObjectHasAttribute('startAtOperationTime', $command->pipeline[0]->{'$changeStream'});
        $this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime);
    }

392 393
    public function testRewindMultipleTimesWithResults()
    {
394 395
        $this->skipIfIsShardedCluster('Cursor needs to be advanced multiple times and can\'t be rewound afterwards.');

396 397 398 399 400 401
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->insertDocument(['x' => 1]);
        $this->insertDocument(['x' => 2]);

402 403 404
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
405 406 407 408 409
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());

        // Subsequent rewind does not change iterator state
410 411 412
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
413 414 415 416 417
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());

        $changeStream->next();
418 419 420 421
        $this->assertTrue($changeStream->valid());
        $this->assertSame(0, $changeStream->key());
        $this->assertNotNull($changeStream->current());

422 423
        /* Rewinding when the iterator is still at its first element is a NOP.
         * Note: PHPLIB-448 may see rewind() throw after any call to next() */
424 425 426
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
        $this->assertTrue($changeStream->valid());
        $this->assertSame(0, $changeStream->key());
        $this->assertNotNull($changeStream->current());

        $changeStream->next();
        $this->assertTrue($changeStream->valid());
        $this->assertSame(1, $changeStream->key());
        $this->assertNotNull($changeStream->current());

        // Rewinding after advancing the iterator is an error
        $this->expectException(LogicException::class);
        $changeStream->rewind();
    }

    public function testRewindMultipleTimesWithNoResults()
    {
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

446 447 448
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
449 450 451 452 453
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());

        // Subsequent rewind does not change iterator state
454 455 456
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
457 458 459 460 461 462 463 464 465
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());

        $changeStream->next();
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());

466 467
        /* Rewinding when the iterator hasn't advanced to an element is a NOP.
         * Note: PHPLIB-448 may see rewind() throw after any call to next() */
468 469 470
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
471 472 473
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());
474 475
    }

476 477
    public function testNoChangeAfterResumeBeforeInsert()
    {
478
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
479
        $changeStream = $operation->execute($this->getPrimaryServer());
480

481 482 483
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
484
        $this->assertFalse($changeStream->valid());
485

486
        $this->insertDocument(['_id' => 1, 'x' => 'foo']);
487

488
        $this->advanceCursorUntilValid($changeStream);
489 490 491 492

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
493
            'fullDocument' => ['_id' => 1, 'x' => 'foo'],
494
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
495
            'documentKey' => ['_id' => 1],
496 497
        ];

498
        $this->assertMatchesDocument($expectedResult, $changeStream->current());
499

500
        $this->killChangeStreamCursor($changeStream);
501

502
        $changeStream->next();
503
        $this->assertFalse($changeStream->valid());
504

505
        $this->insertDocument(['_id' => 2, 'x' => 'bar']);
506

507
        $this->advanceCursorUntilValid($changeStream);
508 509 510 511

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
512
            'fullDocument' => ['_id' => 2, 'x' => 'bar'],
513
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
514
            'documentKey' => ['_id' => 2],
515 516
        ];

517
        $this->assertMatchesDocument($expectedResult, $changeStream->current());
518 519
    }

520
    public function testResumeMultipleTimesInSuccession()
521
    {
522 523
        $this->skipIfIsShardedCluster('getMore may return empty response before periodicNoopIntervalSecs on sharded clusters.');

524 525 526
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

527
        /* Killing the cursor when there are no results will test that neither
528 529
         * the initial rewind() nor a resume attempt via next() increment the
         * key. */
530 531
        $this->killChangeStreamCursor($changeStream);

532 533 534
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
535 536
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
537 538
        $this->assertNull($changeStream->current());

539 540 541 542 543 544 545 546 547 548 549 550
        $changeStream->next();
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());

        // A consecutive resume attempt should still not increment the key
        $this->killChangeStreamCursor($changeStream);

        $changeStream->next();
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
        $this->assertNull($changeStream->current());
551

552 553 554
        /* Insert a document and advance the change stream to ensure we capture
         * a resume token. This is necessary when startAtOperationTime is not
         * supported (i.e. 3.6 server version). */
555
        $this->insertDocument(['_id' => 1]);
556

557
        $changeStream->next();
558
        $this->assertTrue($changeStream->valid());
559
        $this->assertSame(0, $changeStream->key());
560 561 562 563

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
564
            'fullDocument' => ['_id' => 1],
565
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
566
            'documentKey' => ['_id' => 1],
567 568 569 570
        ];

        $this->assertMatchesDocument($expectedResult, $changeStream->current());

571 572 573
        /* Insert another document and kill the cursor. ChangeStream::next()
         * should resume and pick up the last insert. */
        $this->insertDocument(['_id' => 2]);
574 575
        $this->killChangeStreamCursor($changeStream);

576
        $changeStream->next();
577
        $this->assertTrue($changeStream->valid());
578
        $this->assertSame(1, $changeStream->key());
579 580 581 582

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
583
            'fullDocument' => ['_id' => 2],
584
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
585
            'documentKey' => ['_id' => 2],
586 587 588 589
        ];

        $this->assertMatchesDocument($expectedResult, $changeStream->current());

590 591 592 593 594 595 596
        /* Insert another document and kill the cursor. It is technically
         * permissable to call ChangeStream::rewind() since the previous call to
         * next() will have left the cursor positioned at its first and only
         * result. Assert that rewind() does not execute a getMore nor does it
         * modify the iterator's state.
         *
         * Note: PHPLIB-448 may require rewind() to throw an exception here. */
597
        $this->insertDocument(['_id' => 3]);
598
        $this->killChangeStreamCursor($changeStream);
599

600 601 602
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
603 604 605 606 607
        $this->assertTrue($changeStream->valid());
        $this->assertSame(1, $changeStream->key());
        $this->assertMatchesDocument($expectedResult, $changeStream->current());

        // ChangeStream::next() should resume and pick up the last insert
608 609
        $changeStream->next();
        $this->assertTrue($changeStream->valid());
610
        $this->assertSame(2, $changeStream->key());
611 612 613 614

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
615
            'fullDocument' => ['_id' => 3],
616
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
617
            'documentKey' => ['_id' => 3],
618 619 620 621
        ];

        $this->assertMatchesDocument($expectedResult, $changeStream->current());

622
        // Test one final, consecutive resume via ChangeStream::next()
623
        $this->insertDocument(['_id' => 4]);
624
        $this->killChangeStreamCursor($changeStream);
625 626 627

        $changeStream->next();
        $this->assertTrue($changeStream->valid());
628
        $this->assertSame(3, $changeStream->key());
629 630 631 632

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
633
            'fullDocument' => ['_id' => 4],
634
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
635
            'documentKey' => ['_id' => 4],
636 637 638 639 640
        ];

        $this->assertMatchesDocument($expectedResult, $changeStream->current());
    }

641 642
    public function testKey()
    {
643
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
644
        $changeStream = $operation->execute($this->getPrimaryServer());
645

646
        $this->assertFalse($changeStream->valid());
647
        $this->assertNull($changeStream->key());
648

649 650 651
        $this->assertNoCommandExecuted(function () use ($changeStream) {
            $changeStream->rewind();
        });
652 653 654
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());

655
        $this->insertDocument(['_id' => 1, 'x' => 'foo']);
656

657
        $this->advanceCursorUntilValid($changeStream);
658
        $this->assertSame(0, $changeStream->key());
659

660
        $changeStream->next();
661
        $this->assertFalse($changeStream->valid());
662
        $this->assertNull($changeStream->key());
663

664
        $changeStream->next();
665
        $this->assertFalse($changeStream->valid());
666
        $this->assertNull($changeStream->key());
667

668
        $this->killChangeStreamCursor($changeStream);
669

670
        $changeStream->next();
671
        $this->assertFalse($changeStream->valid());
672
        $this->assertNull($changeStream->key());
673

674
        $this->insertDocument(['_id' => 2, 'x' => 'bar']);
675

676
        $this->advanceCursorUntilValid($changeStream);
677
        $this->assertSame(1, $changeStream->key());
678 679 680 681 682 683
    }

    public function testNonEmptyPipeline()
    {
        $pipeline = [['$project' => ['foo' => [0]]]];

684
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
685 686 687
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->insertDocument(['_id' => 1]);
688

689
        $changeStream->rewind();
690 691
        $this->assertFalse($changeStream->valid());

692
        $this->advanceCursorUntilValid($changeStream);
693 694 695 696 697 698 699

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'foo' => [0],
        ];

        $this->assertSameDocument($expectedResult, $changeStream->current());
700 701
    }

702
    /**
703 704 705
     * Prose test 7: "Ensure that a cursor returned from an aggregate command
     * with a cursor id and an initial empty batch is not closed on the driver
     * side."
706
     */
707
    public function testInitialCursorIsNotClosed()
708
    {
709
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
710
        $changeStream = $operation->execute($this->getPrimaryServer());
711

712 713 714 715 716 717 718 719
        /* The spec requests that we assert that the cursor returned from the
         * aggregate command is not closed on the driver side. We will verify
         * this by checking that the cursor ID is non-zero and that libmongoc
         * reports the cursor as alive. While the cursor ID is easily accessed
         * through ChangeStream, we'll need to use reflection to access the
         * internal Cursor and call isDead(). */
        $this->assertNotEquals('0', (string) $changeStream->getCursorId());

720
        $rc = new ReflectionClass(ChangeStream::class);
721
        $rp = $rc->getProperty('iterator');
722 723 724 725 726 727 728 729
        $rp->setAccessible(true);

        $iterator = $rp->getValue($changeStream);

        $this->assertInstanceOf('IteratorIterator', $iterator);

        $cursor = $iterator->getInnerIterator();

730
        $this->assertInstanceOf(Cursor::class, $cursor);
731
        $this->assertFalse($cursor->isDead());
732 733
    }

734
    /**
735
     * Prose test 5: "ChangeStream will not attempt to resume after encountering
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762
     * error code 11601 (Interrupted), 136 (CappedPositionLost), or 237
     * (CursorKilled) while executing a getMore command."
     *
     * @dataProvider provideNonResumableErrorCodes
     */
    public function testNonResumableErrorCodes($errorCode)
    {
        $this->configureFailPoint([
            'configureFailPoint' => 'failCommand',
            'mode' => ['times' => 1],
            'data' => ['failCommands' => ['getMore'], 'errorCode' => $errorCode],
        ]);

        $this->insertDocument(['x' => 1]);

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
        $changeStream = $operation->execute($this->getPrimaryServer());
        $changeStream->rewind();

        $this->expectException(ServerException::class);
        $this->expectExceptionCode($errorCode);
        $changeStream->next();
    }

    public function provideNonResumableErrorCodes()
    {
        return [
763 764 765
            'CappedPositionLost' => [136],
            'CursorKilled' => [237],
            'Interrupted' => [11601],
766 767 768
        ];
    }

769
    /**
770 771 772
     * Prose test 2: "ChangeStream will throw an exception if the server
     * response is missing the resume token (if wire version is < 8, this is a
     * driver-side error; for 8+, this is a server-side error)"
773 774
     */
    public function testResumeTokenNotFoundClientSideError()
775
    {
776 777 778 779
        if (version_compare($this->getServerVersion(), '4.1.8', '>=')) {
            $this->markTestSkipped('Server rejects change streams that modify resume token (SERVER-37786)');
        }

780 781
        $pipeline =  [['$project' => ['_id' => 0 ]]];

782
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
783 784
        $changeStream = $operation->execute($this->getPrimaryServer());

785 786 787 788
        $changeStream->rewind();

        /* Insert two documents to ensure the client does not ignore the first
         * document's resume token in favor of a postBatchResumeToken */
789
        $this->insertDocument(['x' => 1]);
790
        $this->insertDocument(['x' => 2]);
791

792 793
        $this->expectException(ResumeTokenException::class);
        $this->expectExceptionMessage('Resume token not found in change document');
794
        $this->advanceCursorUntilValid($changeStream);
795 796
    }

797
    /**
798 799 800
     * Prose test 2: "ChangeStream will throw an exception if the server
     * response is missing the resume token (if wire version is < 8, this is a
     * driver-side error; for 8+, this is a server-side error)"
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
     */
    public function testResumeTokenNotFoundServerSideError()
    {
        if (version_compare($this->getServerVersion(), '4.1.8', '<')) {
            $this->markTestSkipped('Server does not reject change streams that modify resume token');
        }

        $pipeline =  [['$project' => ['_id' => 0 ]]];

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $changeStream->rewind();
        $this->insertDocument(['x' => 1]);

        $this->expectException(ServerException::class);
817
        $this->advanceCursorUntilValid($changeStream);
818 819 820
    }

    /**
821 822 823
     * Prose test 2: "ChangeStream will throw an exception if the server
     * response is missing the resume token (if wire version is < 8, this is a
     * driver-side error; for 8+, this is a server-side error)"
824 825
     */
    public function testResumeTokenInvalidTypeClientSideError()
826
    {
827 828 829 830
        if (version_compare($this->getServerVersion(), '4.1.8', '>=')) {
            $this->markTestSkipped('Server rejects change streams that modify resume token (SERVER-37786)');
        }

831 832
        $pipeline =  [['$project' => ['_id' => ['$literal' => 'foo']]]];

833
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
834 835
        $changeStream = $operation->execute($this->getPrimaryServer());

836 837 838 839
        $changeStream->rewind();

        /* Insert two documents to ensure the client does not ignore the first
         * document's resume token in favor of a postBatchResumeToken */
840
        $this->insertDocument(['x' => 1]);
841
        $this->insertDocument(['x' => 2]);
842

843 844
        $this->expectException(ResumeTokenException::class);
        $this->expectExceptionMessage('Expected resume token to have type "array or object" but found "string"');
845
        $this->advanceCursorUntilValid($changeStream);
846 847
    }

848
    /**
849 850 851
     * Prose test 2: "ChangeStream will throw an exception if the server
     * response is missing the resume token (if wire version is < 8, this is a
     * driver-side error; for 8+, this is a server-side error)"
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867
     */
    public function testResumeTokenInvalidTypeServerSideError()
    {
        if (version_compare($this->getServerVersion(), '4.1.8', '<')) {
            $this->markTestSkipped('Server does not reject change streams that modify resume token');
        }

        $pipeline =  [['$project' => ['_id' => ['$literal' => 'foo']]]];

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $changeStream->rewind();
        $this->insertDocument(['x' => 1]);

        $this->expectException(ServerException::class);
868
        $this->advanceCursorUntilValid($changeStream);
869 870
    }

871 872
    public function testMaxAwaitTimeMS()
    {
873 874 875
        /* On average, an acknowledged write takes about 20 ms to appear in a
         * change stream on the server so we'll use a higher maxAwaitTimeMS to
         * ensure we see the write. */
876
        $maxAwaitTimeMS = 500;
877

878 879 880
        /* Calculate an approximate pivot to use for time assertions. We will
         * assert that the duration of blocking responses is greater than this
         * value, and vice versa. */
881
        $pivot = $maxAwaitTimeMS * 0.001 * 0.9;
882

883 884 885
        /* Calculate an approximate upper bound to use for time assertions. We
         * will assert that the duration of blocking responses is less than this
         * value. */
886
        $upperBound = $maxAwaitTimeMS * 0.001 * 1.5;
887

888 889
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
        $changeStream = $operation->execute($this->getPrimaryServer());
890

891
        // Rewinding does not issue a getMore, so we should not expect a delay.
892
        $startTime = microtime(true);
893
        $changeStream->rewind();
894
        $duration = microtime(true) - $startTime;
895
        $this->assertLessThan($pivot, $duration);
896

897
        $this->assertFalse($changeStream->valid());
898 899

        /* Advancing again on a change stream will issue a getMore, so we should
900 901 902
         * expect a delay. Expect to wait at least maxAwaitTimeMS, since no new
         * documents will be inserted to wake up the server's query thread. Also
         * ensure we don't wait too long (server default is one second). */
903
        $startTime = microtime(true);
904
        $changeStream->next();
905
        $duration = microtime(true) - $startTime;
906
        $this->assertGreaterThan($pivot, $duration);
907
        $this->assertLessThan($upperBound, $duration);
908

909
        $this->assertFalse($changeStream->valid());
910

911
        $this->insertDocument(['_id' => 1]);
912

913
        /* Advancing the change stream again will issue a getMore, but the
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
         * server should not block since a document has been inserted.
         * For sharded clusters, we have to repeat the getMore iteration until
         * the cursor is valid since the first getMore commands after an insert
         * may not return any data. Only the time of the last getMore command is
         * taken. */
        $attempts = $this->isShardedCluster() ? 5 : 1;
        for ($i = 0; $i < $attempts; $i++) {
            $startTime = microtime(true);
            $changeStream->next();
            $duration = microtime(true) - $startTime;

            if ($changeStream->valid()) {
                break;
            }
        }

930
        $this->assertTrue($changeStream->valid());
931

Andreas Braun's avatar
Andreas Braun committed
932
        if (! $this->isShardedCluster()) {
933 934
            $this->assertLessThan($pivot, $duration);
        }
935
    }
936

937
    public function testRewindExtractsResumeTokenAndNextResumes()
938
    {
939
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
940 941
        $changeStream = $operation->execute($this->getPrimaryServer());

942 943 944
        $this->insertDocument(['_id' => 1, 'x' => 'foo']);
        $this->insertDocument(['_id' => 2, 'x' => 'bar']);
        $this->insertDocument(['_id' => 3, 'x' => 'baz']);
945

946 947 948 949
        /* Obtain a resume token for the first insert. This will allow us to
         * start a change stream from that point and ensure aggregate returns
         * the second insert in its first batch, which in turn will serve as a
         * resume token for rewind() to extract. */
950 951 952
        $changeStream->rewind();
        $this->assertFalse($changeStream->valid());

953
        $this->advanceCursorUntilValid($changeStream);
954

955 956
        $resumeToken = $changeStream->current()->_id;
        $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
957 958
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());
959
        $this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
960 961

        $changeStream->rewind();
962 963 964 965 966 967 968 969 970 971

        if ($this->isShardedCluster()) {
            /* aggregate on a sharded cluster may not return any data in the
             * initial batch until periodicNoopIntervalSecs has passed. Thus,
             * advance the change stream until we've received data. */
            $this->advanceCursorUntilValid($changeStream);
        } else {
            $this->assertTrue($changeStream->valid());
        }

972
        $this->assertSame(0, $changeStream->key());
973 974 975
        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
976
            'fullDocument' => ['_id' => 2, 'x' => 'bar'],
977
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
978
            'documentKey' => ['_id' => 2],
979
        ];
980
        $this->assertMatchesDocument($expectedResult, $changeStream->current());
981 982 983

        $this->killChangeStreamCursor($changeStream);

984
        $this->advanceCursorUntilValid($changeStream);
985
        $this->assertSame(1, $changeStream->key());
986 987 988 989

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
990
            'fullDocument' => ['_id' => 3, 'x' => 'baz'],
991
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
992
            'documentKey' => ['_id' => 3],
993
        ];
994
        $this->assertMatchesDocument($expectedResult, $changeStream->current());
995 996
    }

997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
    public function testResumeAfterOption()
    {
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $changeStream->rewind();
        $this->assertFalse($changeStream->valid());

        $this->insertDocument(['_id' => 1, 'x' => 'foo']);
        $this->insertDocument(['_id' => 2, 'x' => 'bar']);

1008
        $this->advanceCursorUntilValid($changeStream);
1009 1010 1011 1012 1013 1014

        $resumeToken = $changeStream->current()->_id;

        $options = $this->defaultOptions + ['resumeAfter' => $resumeToken];
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());
1015
        $this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
1016 1017

        $changeStream->rewind();
1018 1019 1020 1021 1022 1023 1024 1025 1026

        if ($this->isShardedCluster()) {
            /* aggregate on a sharded cluster may not return any data in the
             * initial batch until periodicNoopIntervalSecs has passed. Thus,
             * advance the change stream until we've received data. */
            $this->advanceCursorUntilValid($changeStream);
        } else {
            $this->assertTrue($changeStream->valid());
        }
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
            'fullDocument' => ['_id' => 2, 'x' => 'bar'],
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
            'documentKey' => ['_id' => 2],
        ];

        $this->assertMatchesDocument($expectedResult, $changeStream->current());
    }

    public function testStartAfterOption()
    {
        if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
            $this->markTestSkipped('startAfter is not supported');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $changeStream->rewind();
        $this->assertFalse($changeStream->valid());

        $this->insertDocument(['_id' => 1, 'x' => 'foo']);
        $this->insertDocument(['_id' => 2, 'x' => 'bar']);

1054
        $this->advanceCursorUntilValid($changeStream);
1055 1056 1057 1058 1059 1060

        $resumeToken = $changeStream->current()->_id;

        $options = $this->defaultOptions + ['startAfter' => $resumeToken];
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());
1061
        $this->assertSameDocument($resumeToken, $changeStream->getResumeToken());
1062 1063

        $changeStream->rewind();
1064 1065 1066 1067 1068 1069 1070 1071 1072

        if ($this->isShardedCluster()) {
            /* aggregate on a sharded cluster may not return any data in the
             * initial batch until periodicNoopIntervalSecs has passed. Thus,
             * advance the change stream until we've received data. */
            $this->advanceCursorUntilValid($changeStream);
        } else {
            $this->assertTrue($changeStream->valid());
        }
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084

        $expectedResult = [
            '_id' => $changeStream->current()->_id,
            'operationType' => 'insert',
            'fullDocument' => ['_id' => 2, 'x' => 'bar'],
            'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
            'documentKey' => ['_id' => 2],
        ];

        $this->assertMatchesDocument($expectedResult, $changeStream->current());
    }

1085 1086 1087 1088 1089
    /**
     * @dataProvider provideTypeMapOptionsAndExpectedChangeDocument
     */
    public function testTypeMapOption(array $typeMap, $expectedChangeDocument)
    {
1090
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['typeMap' => $typeMap] + $this->defaultOptions);
1091 1092 1093
        $changeStream = $operation->execute($this->getPrimaryServer());

        $changeStream->rewind();
1094
        $this->assertFalse($changeStream->valid());
1095 1096 1097

        $this->insertDocument(['_id' => 1, 'x' => 'foo']);

1098
        $this->advanceCursorUntilValid($changeStream);
1099

1100
        $this->assertMatchesDocument($expectedChangeDocument, $changeStream->current());
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
    }

    public function provideTypeMapOptionsAndExpectedChangeDocument()
    {
        /* Note: the "_id" and "ns" fields are purposefully omitted because the
         * resume token's value cannot be anticipated and the collection name,
         * which is generated from the test name, is not available in the data
         * provider, respectively. */
        return [
            [
                ['root' => 'array', 'document' => 'array'],
                [
                    'operationType' => 'insert',
                    'fullDocument' => ['_id' => 1, 'x' => 'foo'],
                    'documentKey' => ['_id' => 1],
                ],
            ],
            [
                ['root' => 'object', 'document' => 'array'],
                (object) [
                    'operationType' => 'insert',
                    'fullDocument' => ['_id' => 1, 'x' => 'foo'],
                    'documentKey' => ['_id' => 1],
                ],
            ],
            [
                ['root' => 'array', 'document' => 'stdClass'],
                [
                    'operationType' => 'insert',
                    'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
                    'documentKey' => (object) ['_id' => 1],
                ],
            ],
        ];
    }

1137 1138 1139 1140 1141 1142 1143 1144
    public function testNextAdvancesKey()
    {
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->insertDocument(['x' => 1]);
        $this->insertDocument(['x' => 2]);

1145 1146
        /* Note: we intentionally do not start iteration with rewind() to ensure
         * that next() behaves identically when called without rewind(). */
1147
        $this->advanceCursorUntilValid($changeStream);
1148 1149 1150 1151 1152 1153 1154 1155

        $this->assertSame(0, $changeStream->key());

        $changeStream->next();

        $this->assertSame(1, $changeStream->key());
    }

1156
    public function testResumeTokenNotFoundDoesNotAdvanceKey()
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
    {
        $pipeline =  [['$project' => ['_id' => 0 ]]];

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->insertDocument(['x' => 1]);
        $this->insertDocument(['x' => 2]);
        $this->insertDocument(['x' => 3]);

1167 1168
        $changeStream->rewind();
        $this->assertFalse($changeStream->valid());
1169
        $this->assertNull($changeStream->key());
1170 1171

        try {
1172
            $this->advanceCursorUntilValid($changeStream);
1173 1174
            $this->fail('Exception for missing resume token was not thrown');
        } catch (ResumeTokenException $e) {
1175
            /* On server versions < 4.1.8, a client-side error is thrown. */
1176
        } catch (ServerException $e) {
1177
            /* On server versions >= 4.1.8, the error is thrown server-side. */
1178
        }
1179

1180 1181
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
1182 1183 1184

        try {
            $changeStream->next();
1185 1186 1187 1188
            $this->fail('Exception for missing resume token was not thrown');
        } catch (ResumeTokenException $e) {
        } catch (ServerException $e) {
        }
1189

1190 1191
        $this->assertFalse($changeStream->valid());
        $this->assertNull($changeStream->key());
1192 1193
    }

1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
    public function testSessionPersistsAfterResume()
    {
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $changeStream = null;
        $originalSession = null;
        $sessionAfterResume = [];
        $commands = [];

        /* We want to ensure that the lsid of the initial aggregate matches the
         * lsid of any aggregates after the change stream resumes. After
         * PHPC-1152 is complete, we will ensure that the lsid of the initial
         * aggregate matches the lsid of any subsequent aggregates and getMores.
         */
1208 1209
        (new CommandObserver())->observe(
            function () use ($operation, &$changeStream) {
1210 1211
                $changeStream = $operation->execute($this->getPrimaryServer());
            },
1212
            function (array $event) use (&$originalSession) {
1213 1214 1215
                $command = $event['started']->getCommand();
                if (isset($command->aggregate)) {
                    $originalSession = bin2hex((string) $command->lsid->id);
1216 1217 1218 1219 1220 1221 1222
                }
            }
        );

        $changeStream->rewind();
        $this->killChangeStreamCursor($changeStream);

1223 1224
        (new CommandObserver())->observe(
            function () use (&$changeStream) {
1225 1226
                $changeStream->next();
            },
1227 1228 1229
            function (array $event) use (&$sessionAfterResume, &$commands) {
                $commands[] = $event['started']->getCommandName();
                $sessionAfterResume[] = bin2hex((string) $event['started']->getCommand()->lsid->id);
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
            }
        );

        $expectedCommands = [
            /* We expect a getMore to be issued because we are calling next(). */
            'getMore',
            /* Since we have killed the cursor, ChangeStream will resume by
             * issuing a new aggregate commmand. */
            'aggregate',
            /* When ChangeStream resumes, it overwrites its original cursor with
             * the new cursor resulting from the last aggregate command. This
             * removes the last reference to the old cursor, which causes the
             * driver to kill it (via mongoc_cursor_destroy()). */
            'killCursors',
        ];

        $this->assertSame($expectedCommands, $commands);

        foreach ($sessionAfterResume as $session) {
            $this->assertEquals($session, $originalSession);
        }
    }

    public function testSessionFreed()
    {
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $rc = new ReflectionClass($changeStream);
        $rp = $rc->getProperty('resumeCallable');
        $rp->setAccessible(true);

1262
        $this->assertIsCallable($rp->getValue($changeStream));
1263 1264

        // Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
1265
        $this->dropCollection();
1266

1267
        $this->advanceCursorUntilValid($changeStream);
1268 1269 1270 1271

        $this->assertNull($rp->getValue($changeStream));
    }

1272
    /**
1273
     * Prose test 3: "ChangeStream will automatically resume one time on a
1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
     * resumable error (including not master) with the initial pipeline and
     * options, except for the addition/update of a resumeToken."
     */
    public function testResumeRepeatsOriginalPipelineAndOptions()
    {
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $aggregateCommands = [];

        $this->configureFailPoint([
            'configureFailPoint' => 'failCommand',
            'mode' => ['times' => 1],
            'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER],
        ]);

1289 1290
        (new CommandObserver())->observe(
            function () use ($operation) {
1291 1292 1293 1294 1295 1296
                $changeStream = $operation->execute($this->getPrimaryServer());

                // The first next will hit the fail point, causing a resume
                $changeStream->next();
                $changeStream->next();
            },
1297
            function (array $event) use (&$aggregateCommands) {
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
                $command = $event['started']->getCommand();
                if ($event['started']->getCommandName() !== 'aggregate') {
                    return;
                }

                $aggregateCommands[] = (array) $command;
            }
        );

        $this->assertCount(2, $aggregateCommands);

        $this->assertThat(
            $aggregateCommands[0]['pipeline'][0]->{'$changeStream'},
            $this->logicalNot(
                $this->logicalOr(
                    $this->objectHasAttribute('resumeAfter'),
                    $this->objectHasAttribute('startAfter'),
                    $this->objectHasAttribute('startAtOperationTime')
                )
            )
        );

        $this->assertThat(
            $aggregateCommands[1]['pipeline'][0]->{'$changeStream'},
            $this->logicalOr(
                $this->objectHasAttribute('resumeAfter'),
                $this->objectHasAttribute('startAfter'),
                $this->objectHasAttribute('startAtOperationTime')
            )
        );

        $aggregateCommands = array_map(
            function (array $aggregateCommand) {
                // Remove resume options from the changestream document
                if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) {
                    $aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key(
                        (array) $aggregateCommand['pipeline'][0]->{'$changeStream'},
                        ['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false]
                    );
                }

                // Remove options we don't want to compare between commands
                return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]);
            },
            $aggregateCommands
        );

        // Ensure options in original and resuming aggregate command match
        $this->assertEquals($aggregateCommands[0], $aggregateCommands[1]);
    }

    /**
1350
     * Prose test 4: "ChangeStream will not attempt to resume on any error
1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
     * encountered while executing an aggregate command."
     */
    public function testErrorDuringAggregateCommandDoesNotCauseResume()
    {
        if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
            $this->markTestSkipped('failCommand is not supported');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $commandCount = 0;

        $this->configureFailPoint([
            'configureFailPoint' => 'failCommand',
            'mode' => ['times' => 1],
1366
            'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::INTERRUPTED],
1367 1368 1369 1370
        ]);

        $this->expectException(CommandException::class);

1371 1372
        (new CommandObserver())->observe(
            function () use ($operation) {
1373 1374
                $operation->execute($this->getPrimaryServer());
            },
1375
            function (array $event) use (&$commandCount) {
1376 1377 1378 1379 1380 1381 1382 1383
                $commandCount++;
            }
        );

        $this->assertSame(1, $commandCount);
    }

    /**
1384 1385
     * Prose test 6: "ChangeStream will perform server selection before
     * attempting to resume, using initial readPreference"
1386 1387 1388
     */
    public function testOriginalReadPreferenceIsPreservedOnResume()
    {
1389 1390 1391 1392
        if ($this->isShardedCluster()) {
            $this->markTestSkipped('Test does not apply to sharded clusters');
        }

1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
        $readPreference = new ReadPreference('secondary');
        $options = ['readPreference' => $readPreference] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);

        try {
            $secondary = $this->manager->selectServer($readPreference);
        } catch (ConnectionTimeoutException $e) {
            $this->markTestSkipped('Secondary is not available');
        }

        $changeStream = $operation->execute($secondary);
        $previousCursorId = $changeStream->getCursorId();
        $this->killChangeStreamCursor($changeStream);

        $changeStream->next();
        $this->assertNotSame($previousCursorId, $changeStream->getCursorId());

        $getCursor = Closure::bind(
            function () {
                return $this->iterator->getInnerIterator();
            },
            $changeStream,
            ChangeStream::class
        );
        /** @var Cursor $cursor */
        $cursor = $getCursor();
        self::assertTrue($cursor->getServer()->isSecondary());
    }

    /**
1423
     * Prose test 12
1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
     * For a ChangeStream under these conditions:
     * - Running against a server <4.0.7.
     * - The batch is empty or has been iterated to the last document.
     * Expected result:
     * - getResumeToken must return the _id of the last document returned if one exists.
     * - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
     * - If resumeAfter was not specified, the getResumeToken result must be empty.
     */
    public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
    {
        if ($this->isPostBatchResumeTokenSupported()) {
            $this->markTestSkipped('postBatchResumeToken is supported');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->assertNull($changeStream->getResumeToken());

        $this->insertDocument(['x' => 1]);

        $changeStream->next();
        $this->assertTrue($changeStream->valid());
        $resumeToken = $changeStream->getResumeToken();
        $this->assertSame($resumeToken, $changeStream->current()->_id);

        $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->assertSame($resumeToken, $changeStream->getResumeToken());
1455 1456 1457
    }

    /**
1458
     * Prose test 14
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
     * For a ChangeStream under these conditions:
     *  - The batch is not empty.
     *  - The batch hasn’t been iterated at all.
     *  - Only the initial aggregate command has been executed.
     * Expected result:
     *  - getResumeToken must return startAfter from the initial aggregate if the option was specified.
     *  - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
     *  - If neither the startAfter nor resumeAfter options were specified, the getResumeToken result must be empty.
     */
    public function testResumeTokenBehaviour()
    {
        if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
            $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
        }

1474 1475
        $this->skipIfIsShardedCluster('Resume token behaviour can\'t be reliably tested on sharded clusters.');

1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

        $lastOpTime = null;

        $changeStream = null;
        (new CommandObserver())->observe(function () use ($operation, &$changeStream) {
            $changeStream = $operation->execute($this->getPrimaryServer());
        }, function ($event) use (&$lastOpTime) {
            $this->assertInstanceOf(CommandSucceededEvent::class, $event['succeeded']);
            $reply = $event['succeeded']->getReply();

            $this->assertObjectHasAttribute('operationTime', $reply);
            $lastOpTime = $reply->operationTime;
        });

        $this->insertDocument(['x' => 1]);

1493
        $this->advanceCursorUntilValid($changeStream);
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
        $this->assertTrue($changeStream->valid());
        $resumeToken = $changeStream->getResumeToken();

        $this->insertDocument(['x' => 2]);

        // Test startAfter option
        $options = ['startAfter' => $resumeToken] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->assertEquals($resumeToken, $changeStream->getResumeToken());

        // Test resumeAfter option
        $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->assertEquals($resumeToken, $changeStream->getResumeToken());

        // Test without option
        $options = ['startAtOperationTime' => $lastOpTime] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->assertNull($changeStream->getResumeToken());
1519 1520
    }

1521
    /**
1522 1523 1524 1525
     * Prose test 17: "$changeStream stage for ChangeStream started with
     * startAfter against a server >=4.1.1 that has not received any results yet
     * MUST include a startAfter option and MUST NOT include a resumeAfter
     * option when resuming a change stream."
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537
     */
    public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption()
    {
        if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
            $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->insertDocument(['x' => 1]);

1538
        $this->advanceCursorUntilValid($changeStream);
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549
        $this->assertTrue($changeStream->valid());
        $resumeToken = $changeStream->getResumeToken();

        $options = ['startAfter' => $resumeToken] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());
        $changeStream->rewind();
        $this->killChangeStreamCursor($changeStream);

        $aggregateCommand = null;

1550 1551
        (new CommandObserver())->observe(
            function () use ($changeStream) {
1552 1553
                $changeStream->next();
            },
1554
            function (array $event) use (&$aggregateCommand) {
1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568
                if ($event['started']->getCommandName() !== 'aggregate') {
                    return;
                }

                $aggregateCommand = $event['started']->getCommand();
            }
        );

        $this->assertNotNull($aggregateCommand);
        $this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
        $this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
    }

    /**
1569 1570 1571 1572
     * Prose test 18: "$changeStream stage for ChangeStream started with
     * startAfter against a server >=4.1.1 that has received at least one result
     * MUST include a resumeAfter option and MUST NOT include a startAfter
     * option when resuming a change stream."
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
     */
    public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption()
    {
        if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
            $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
        }

        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
        $changeStream = $operation->execute($this->getPrimaryServer());

        $this->insertDocument(['x' => 1]);

1585
        $this->advanceCursorUntilValid($changeStream);
1586 1587 1588 1589 1590 1591 1592 1593
        $resumeToken = $changeStream->getResumeToken();

        $options = ['startAfter' => $resumeToken] + $this->defaultOptions;
        $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
        $changeStream = $operation->execute($this->getPrimaryServer());
        $changeStream->rewind();

        $this->insertDocument(['x' => 2]);
1594
        $this->advanceCursorUntilValid($changeStream);
1595 1596 1597 1598 1599 1600
        $this->assertTrue($changeStream->valid());

        $this->killChangeStreamCursor($changeStream);

        $aggregateCommand = null;

1601 1602
        (new CommandObserver())->observe(
            function () use ($changeStream) {
1603 1604
                $changeStream->next();
            },
1605
            function (array $event) use (&$aggregateCommand) {
1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
                if ($event['started']->getCommandName() !== 'aggregate') {
                    return;
                }

                $aggregateCommand = $event['started']->getCommand();
            }
        );

        $this->assertNotNull($aggregateCommand);
        $this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
        $this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
    }

1619 1620 1621 1622
    private function assertNoCommandExecuted(callable $callable)
    {
        $commands = [];

1623
        (new CommandObserver())->observe(
1624
            $callable,
1625
            function (array $event) use (&$commands) {
1626 1627 1628 1629 1630 1631 1632
                $this->fail(sprintf('"%s" command was executed', $event['started']->getCommandName()));
            }
        );

        $this->assertEmpty($commands);
    }

1633 1634 1635
    private function getPostBatchResumeTokenFromReply(stdClass $reply)
    {
        $this->assertObjectHasAttribute('cursor', $reply);
1636
        $this->assertIsObject($reply->cursor);
1637
        $this->assertObjectHasAttribute('postBatchResumeToken', $reply->cursor);
1638
        $this->assertIsObject($reply->cursor->postBatchResumeToken);
1639 1640 1641 1642

        return $reply->cursor->postBatchResumeToken;
    }

1643 1644
    private function insertDocument($document)
    {
1645 1646 1647 1648 1649 1650
        $insertOne = new InsertOne(
            $this->getDatabaseName(),
            $this->getCollectionName(),
            $document,
            ['writeConcern' => new WriteConcern(WriteConcern::MAJORITY)]
        );
1651 1652 1653
        $writeResult = $insertOne->execute($this->getPrimaryServer());
        $this->assertEquals(1, $writeResult->getInsertedCount());
    }
1654

1655 1656 1657 1658 1659 1660 1661
    private function isPostBatchResumeTokenSupported()
    {
        return version_compare($this->getServerVersion(), '4.0.7', '>=');
    }

    private function isStartAtOperationTimeSupported()
    {
1662
        return server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime);
1663 1664
    }

1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
    private function killChangeStreamCursor(ChangeStream $changeStream)
    {
        $command = [
            'killCursors' => $this->getCollectionName(),
            'cursors' => [ $changeStream->getCursorId() ],
        ];

        $operation = new DatabaseCommand($this->getDatabaseName(), $command);
        $operation->execute($this->getPrimaryServer());
    }
1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702

    private function advanceCursorUntilValid(Iterator $iterator, $limitOnShardedClusters = 5)
    {
        if (! $this->isShardedCluster()) {
            $iterator->next();
            $this->assertTrue($iterator->valid());

            return;
        }

        for ($i = 0; $i < $limitOnShardedClusters; $i++) {
            $iterator->next();
            if ($iterator->valid()) {
                return;
            }
        }

        throw new ExpectationFailedException(sprintf('Expected cursor to return an element but none was found after %d attempts.', $limitOnShardedClusters));
    }

    private function skipIfIsShardedCluster($message)
    {
        if (! $this->isShardedCluster()) {
            return;
        }

        $this->markTestSkipped(sprintf('Test does not apply on sharded clusters: %s', $message));
    }
1703
}