ChangeStreamsSpecTest.php 9.77 KB
Newer Older
1 2 3 4
<?php

namespace MongoDB\Tests\SpecTests;

5 6
use ArrayIterator;
use LogicException;
7
use MongoDB\BSON\Int64;
8 9 10 11 12
use MongoDB\ChangeStream;
use MongoDB\Driver\Exception\Exception;
use MongoDB\Model\BSONDocument;
use MultipleIterator;
use stdClass;
13 14 15 16
use function basename;
use function count;
use function file_get_contents;
use function glob;
17 18 19 20 21 22 23 24

/**
 * Change Streams spec tests.
 *
 * @see https://github.com/mongodb/specifications/tree/master/source/change-streams
 */
class ChangeStreamsSpecTest extends FunctionalTestCase
{
25
    /** @var array */
26
    private static $incompleteTests = [];
27

28 29 30
    /**
     * Assert that the expected and actual command documents match.
     *
31 32
     * Note: this method may modify the $expected object.
     *
33 34 35 36 37
     * @param stdClass $expected Expected command document
     * @param stdClass $actual   Actual command document
     */
    public static function assertCommandMatches(stdClass $expected, stdClass $actual)
    {
38 39 40 41 42 43 44 45 46
        if (isset($expected->getMore) && $expected->getMore === 42) {
            static::assertObjectHasAttribute('getMore', $actual);
            static::assertThat($actual->getMore, static::logicalOr(
                static::isInstanceOf(Int64::class),
                static::isType('integer')
            ));
            unset($expected->getMore);
        }

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
        static::assertDocumentsMatch($expected, $actual);
    }

    /**
     * Assert that the expected and actual documents match.
     *
     * @param array $expectedDocuments Expected documents
     * @param array $actualDocuments   Actual documents
     */
    public static function assertResult(array $expectedDocuments, array $actualDocuments)
    {
        static::assertCount(count($expectedDocuments), $actualDocuments);

        $mi = new MultipleIterator(MultipleIterator::MIT_NEED_ANY);
        $mi->attachIterator(new ArrayIterator($expectedDocuments));
        $mi->attachIterator(new ArrayIterator($actualDocuments));

        foreach ($mi as $documents) {
            list($expectedDocument, $actualDocument) = $documents;

            $constraint = new DocumentsMatchConstraint($expectedDocument, true, true, ['42']);

            static::assertThat($actualDocument, $constraint);
        }
    }

    /**
     * Execute an individual test case from the specification.
     *
     * @dataProvider provideTests
77 78 79 80 81
     * @param stdClass $test            Individual "tests[]" document
     * @param string   $databaseName    Name of database under test
     * @param string   $collectionName  Name of collection under test
     * @param string   $database2Name   Name of alternate database under test
     * @param string   $collection2Name Name of alternate collection under test
82
     */
83
    public function testChangeStreams(stdClass $test, $databaseName = null, $collectionName = null, $database2Name = null, $collection2Name = null)
84
    {
85 86 87 88
        if (isset(self::$incompleteTests[$this->dataDescription()])) {
            $this->markTestIncomplete(self::$incompleteTests[$this->dataDescription()]);
        }

89 90 91 92
        if ($this->isShardedCluster() && ! $this->isShardedClusterUsingReplicasets()) {
            $this->markTestSkipped('$changeStream is only supported with replicasets');
        }

93 94
        $this->checkServerRequirements($this->createRunOn($test));

95
        if (! isset($databaseName, $collectionName)) {
96 97 98 99 100 101 102
            $this->fail('Required database and collection names are unset');
        }

        $context = Context::fromChangeStreams($test, $databaseName, $collectionName);
        $this->setContext($context);

        $this->dropDatabasesAndCreateCollection($databaseName, $collectionName);
103 104 105 106

        if (isset($database2Name, $collection2Name)) {
            $this->dropDatabasesAndCreateCollection($database2Name, $collection2Name);
        }
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

        if (isset($test->failPoint)) {
            $this->configureFailPoint($test->failPoint);
        }

        if (isset($test->expectations)) {
            $commandExpectations = CommandExpectations::fromChangeStreams($test->expectations);
            $commandExpectations->startMonitoring();
        }

        $errorExpectation = ErrorExpectation::fromChangeStreams($test->result);
        $resultExpectation = ResultExpectation::fromChangeStreams($test->result, [$this, 'assertResult']);

        $result = null;
        $exception = null;

        try {
            $changeStream = $this->createChangeStream($test);
        } catch (Exception $e) {
            $exception = $e;
        }

        if (isset($commandExpectations)) {
            $commandExpectations->stopMonitoring();
        }

        foreach ($test->operations as $operation) {
            Operation::fromChangeStreams($operation)->assert($this, $context);
        }

        if (isset($commandExpectations)) {
            $commandExpectations->startMonitoring();
        }

        /* If the change stream was successfully created (i.e. $exception is
         * null), attempt to iterate up to the expected number of results. It's
         * possible that some errors (e.g. projecting out _id) will only be
         * thrown during iteration, so we must also try/catch here. */
        try {
            if (isset($changeStream)) {
                $limit = isset($test->result->success) ? count($test->result->success) : 0;
                $result = $this->iterateChangeStream($changeStream, $limit);
            }
        } catch (Exception $e) {
            $this->assertNull($exception);
            $exception = $e;
        }

        $errorExpectation->assert($this, $exception);
        $resultExpectation->assert($this, $result);

        if (isset($commandExpectations)) {
            $commandExpectations->stopMonitoring();
            $commandExpectations->assert($this, $context);
        }
    }

    public function provideTests()
    {
        $testArgs = [];

        foreach (glob(__DIR__ . '/change-streams/*.json') as $filename) {
            $json = $this->decodeJson(file_get_contents($filename));
            $group = basename($filename, '.json');
171 172 173 174
            $databaseName = $json->database_name ?? null;
            $database2Name = $json->database2_name ?? null;
            $collectionName = $json->collection_name ?? null;
            $collection2Name = $json->collection2_name ?? null;
175 176 177

            foreach ($json->tests as $test) {
                $name = $group . ': ' . $test->description;
178
                $testArgs[$name] = [$test, $databaseName, $collectionName, $database2Name, $collection2Name];
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
            }
        }

        return $testArgs;
    }

    /**
     * Create a change stream.
     *
     * @param stdClass $test
     * @return ChangeStream
     * @throws LogicException if the target is unsupported
     */
    private function createChangeStream(stdClass $test)
    {
        $context = $this->getContext();
195
        $pipeline = $test->changeStreamPipeline ?? [];
196 197 198 199
        $options = isset($test->changeStreamOptions) ? (array) $test->changeStreamOptions : [];

        switch ($test->target) {
            case 'client':
200
                return $context->getClient()->watch($pipeline, $options);
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
            case 'database':
                return $context->getDatabase()->watch($pipeline, $options);
            case 'collection':
                return $context->getCollection()->watch($pipeline, $options);
            default:
                throw new LogicException('Unsupported target: ' . $test->target);
        }
    }

    /**
     * Convert the server requirements to a standard "runOn" array used by other
     * specifications.
     *
     * @param stdClass $test
     * @return array
     */
    private function createRunOn(stdClass $test)
    {
219
        $req = new stdClass();
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

        /* Append ".99" as patch version, since command monitoring tests expect
         * the minor version to be an inclusive upper bound. */
        if (isset($test->maxServerVersion)) {
            $req->maxServerVersion = $test->maxServerVersion;
        }

        if (isset($test->minServerVersion)) {
            $req->minServerVersion = $test->minServerVersion;
        }

        if (isset($test->topology)) {
            $req->topology = $test->topology;
        }

        return [$req];
    }

    /**
     * Drop the database and create the collection.
     *
     * @param string $databaseName
     * @param string $collectionName
     */
    private function dropDatabasesAndCreateCollection($databaseName, $collectionName)
    {
        $context = $this->getContext();

248
        $database = $context->getClient()->selectDatabase($databaseName);
249 250 251 252 253 254 255 256
        $database->drop($context->defaultWriteOptions);
        $database->createCollection($collectionName, $context->defaultWriteOptions);
    }

    /**
     * Iterate a change stream.
     *
     * @param ChangeStream $changeStream
257
     * @param integer      $limit
258 259 260 261
     * @return BSONDocument[]
     */
    private function iterateChangeStream(ChangeStream $changeStream, $limit = 0)
    {
262 263 264 265 266 267 268 269
        if ($limit < 0) {
            throw new LogicException('$limit is negative');
        }

        /* Limit iterations to guard against an infinite loop should a test fail
         * to return as many results as are expected. Require at least one
         * iteration to allow next() a chance to throw for error tests. */
        $maxIterations = $limit + 1;
270 271 272 273 274 275 276

        /* On sharded clusters, allow for empty getMore calls due to sharding
         * architecture */
        if ($this->isShardedCluster()) {
            $maxIterations *= 5;
        }

277 278
        $events = [];

279
        for ($i = 0, $changeStream->rewind(); $i < $maxIterations; $i++, $changeStream->next()) {
280
            if (! $changeStream->valid()) {
281 282 283 284 285 286
                continue;
            }

            $event = $changeStream->current();
            $this->assertInstanceOf(BSONDocument::class, $event);
            $events[] = $event;
287 288 289 290

            if (count($events) >= $limit) {
                break;
            }
291 292 293 294 295
        }

        return $events;
    }
}