Commit d9858d1f authored by Jeremy Mikola's avatar Jeremy Mikola

Refactor change stream tests use Operation instead of Collection

For consistency with other operation functional tests, this now constructs and executes operation classes directly.
parent 15b4702f
......@@ -3,14 +3,16 @@
namespace MongoDB\Tests\Operation;
use MongoDB\Client;
use MongoDB\Collection;
use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch;
class WatchFunctionalTest extends FunctionalTestCase
{
public function setUp()
{
parent::setUp();
if (version_compare($this->getFeatureCompatibilityVersion(), '3.6', '<')) {
$this->markTestSkipped('$changeStream is only supported on FCV 3.6 or higher');
}
......@@ -18,71 +20,61 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testResume()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream = $this->collection->watch();
$changeStream->rewind();
$this->assertNull($changeStream->current());
$result = $this->collection->insertOne(['x' => 2]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$expectedResult = (object) ([
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 2],
'fullDocument' => (object) ['_id' => 2, 'x' => 'bar'],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.e68b9f01'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
'documentKey' => (object) ['_id' => 2]
]);
$this->assertEquals($changeStream->current(), $expectedResult);
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStream->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$result = $this->collection->insertOne(['x' => 3]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 3, 'x' => 'baz']);
$changeStream->next();
$expectedResult = (object) ([
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 3],
'fullDocument' => (object) ['_id' => 3, 'x' => 'baz'],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.e68b9f01'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
'documentKey' => (object) ['_id' => 3]
]);
$this->assertEquals($changeStream->current(), $expectedResult);
}
public function testNoChangeAfterResumeBeforeInsert()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream = $this->collection->watch();
$changeStream->rewind();
$this->assertNull($changeStream->current());
$result = $this->collection->insertOne(['x' => 2]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$expectedResult = (object) ([
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 2],
'fullDocument' => (object) ['_id' => 2, 'x' => 'bar'],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.4a554985'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
'documentKey' => (object) ['_id' => 2]
]);
$this->assertEquals($changeStream->current(), $expectedResult);
......@@ -92,26 +84,23 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next();
$this->assertNull($changeStream->current());
$result = $this->collection->insertOne(['x' => 3]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 3, 'x' => 'baz']);
$changeStream->next();
$expectedResult = (object) ([
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 3],
'fullDocument' => (object) ['_id' => 3, 'x' => 'baz'],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.4a554985'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
'documentKey' => (object) ['_id' => 3]
]);
$this->assertEquals($changeStream->current(), $expectedResult);
}
public function testResumeAfterKillThenNoOperations()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStream = $this->collection->watch();
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStream->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
......@@ -122,16 +111,13 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testResumeAfterKillThenOperation()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStream = $this->collection->watch();
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$operation = new DatabaseCommand($this->getDatabaseName(), ["killCursors" => $this->getCollectionName(), "cursors" => [$changeStream->getCursorId()]]);
$operation->execute($this->getPrimaryServer());
$result = $this->collection->insertOne(['x' => 3]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next();
$this->assertNull($changeStream->current());
......@@ -139,15 +125,12 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testKey()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$changeStream = $this->collection->watch();
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->assertNull($changeStream->key());
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next();
$this->assertSame(1, $changeStream->key());
......@@ -163,9 +146,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next();
$this->assertNull($changeStream->key());
$result = $this->collection->insertOne(['x' => 2]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
$changeStream->next();
$this->assertSame(2, $changeStream->key());
......@@ -173,14 +154,12 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testNonEmptyPipeline()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$pipeline = [['$project' => ['foo' => [0]]]];
$changeStream = $this->collection->watch($pipeline, []);
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['_id' => 1]);
$changeStream->next();
$expectedResult = (object) ([
......@@ -192,9 +171,9 @@ class WatchFunctionalTest extends FunctionalTestCase
public function testCursorWithEmptyBatchNotClosed()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
$changeStream = $operation->execute($this->getPrimaryServer());
$changeStream = $this->collection->watch();
$this->assertNotNull($changeStream);
}
......@@ -203,14 +182,12 @@ class WatchFunctionalTest extends FunctionalTestCase
*/
public function testFailureAfterResumeTokenRemoved()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$pipeline = [['$project' => ['_id' => 0 ]]];
$changeStream = $this->collection->watch($pipeline, []);
$result = $this->collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$changeStream->next();
}
......@@ -223,29 +200,28 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream = $collection->watch();
$changeStream->next();
$result = $collection->insertOne(['x' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next();
$expectedResult = (object) ([
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => $result->getInsertedId(), 'x' => 1],
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.226d95f1'],
'documentKey' => (object) ['_id' => $result->getInsertedId()]
'documentKey' => (object) ['_id' => 1]
]);
$this->assertEquals($changeStream->current(), $expectedResult);
}
public function testMaxAwaitTimeMS()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
/* On average, an acknowledged write takes about 20 ms to appear in a
* change stream on the server so we'll use a higher maxAwaitTimeMS to
* ensure we see the write. */
$maxAwaitTimeMS = 100;
$changeStream = $this->collection->watch([], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
$changeStream = $operation->execute($this->getPrimaryServer());
/* The initial change stream is empty so we should expect a delay when
* we call rewind, since it issues a getMore. Expect to wait at least
......@@ -272,9 +248,7 @@ class WatchFunctionalTest extends FunctionalTestCase
/* After inserting a document, the change stream will not issue a
* getMore so we should not expect a delay. */
$result = $this->collection->insertOne(['_id' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$this->insertDocument(['_id' => 1]);
$startTime = microtime(true);
$changeStream->next();
......@@ -282,4 +256,11 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
$this->assertTrue($changeStream->valid());
}
private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
$writeResult = $insertOne->execute($this->getPrimaryServer());
$this->assertEquals(1, $writeResult->getInsertedCount());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment