Commit 3a4e9202 authored by Jeremy Mikola's avatar Jeremy Mikola

Test that ChangeStream::next() resumes after dropped connection

The original intention of this test was to verify resumability in the event of a dropped connection; however, since socketTimeoutMS and maxAwaitTimeMS are fixed values, we cannot test a dropped connection followed by a successful resume. Instead, we will test that the driver attempts to resume once and only once before we expect a socket timeout after the second attempt.

Note: ChangeStream::rewind() does not currently resume (see: PHPLIB-322)
parent 8acab3b4
...@@ -6,6 +6,7 @@ use MongoDB\Driver\Monitoring\CommandFailedEvent; ...@@ -6,6 +6,7 @@ use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandStartedEvent; use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSucceededEvent; use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber; use MongoDB\Driver\Monitoring\CommandSubscriber;
use Exception;
/** /**
* Observes command documents using the driver's monitoring API. * Observes command documents using the driver's monitoring API.
...@@ -20,13 +21,19 @@ class CommandObserver implements CommandSubscriber ...@@ -20,13 +21,19 @@ class CommandObserver implements CommandSubscriber
\MongoDB\Driver\Monitoring\addSubscriber($this); \MongoDB\Driver\Monitoring\addSubscriber($this);
call_user_func($execution); try {
call_user_func($execution);
} catch (Exception $executionException) {}
\MongoDB\Driver\Monitoring\removeSubscriber($this); \MongoDB\Driver\Monitoring\removeSubscriber($this);
foreach ($this->commands as $command) { foreach ($this->commands as $command) {
call_user_func($commandCallback, $command); call_user_func($commandCallback, $command);
} }
if (isset($executionException)) {
throw $executionException;
}
} }
public function commandStarted(CommandStartedEvent $event) public function commandStarted(CommandStartedEvent $event)
......
...@@ -3,10 +3,15 @@ ...@@ -3,10 +3,15 @@
namespace MongoDB\Tests\Operation; namespace MongoDB\Tests\Operation;
use MongoDB\Client; use MongoDB\Client;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Operation\DatabaseCommand; use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\InsertOne; use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch; use MongoDB\Operation\Watch;
use MongoDB\Tests\CommandObserver;
use stdClass;
class WatchFunctionalTest extends FunctionalTestCase class WatchFunctionalTest extends FunctionalTestCase
{ {
...@@ -61,6 +66,62 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -61,6 +66,62 @@ class WatchFunctionalTest extends FunctionalTestCase
$this->assertEquals($expectedResult, $changeStream->current()); $this->assertEquals($expectedResult, $changeStream->current());
} }
/**
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
*/
public function testNextResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
* a socket timeout that is less than the change stream's maxAwaitTimeMS
* option. */
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($primaryServer);
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test resume functionality within next(). */
$commands = [];
try {
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->next();
},
function(stdClass $command) use (&$commands) {
$commands[] = key((array) $command);
}
);
$this->fail('ConnectionTimeoutException was not thrown');
} catch (ConnectionTimeoutException $e) {}
$expectedCommands = [
/* The initial aggregate command for change streams returns a cursor
* envelope with an empty initial batch, since there are no changes
* to report at the moment the change stream is created. Therefore,
* we expect a getMore to be issued when we first advance the change
* stream (with either rewind() or next()). */
'getMore',
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
* getMore command encounters a client socket timeout and leaves the
* cursor open on the server. ChangeStream should catch this error
* and resume by issuing a new aggregate command. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore',
];
$this->assertSame($expectedCommands, $commands);
}
public function testNoChangeAfterResumeBeforeInsert() public function testNoChangeAfterResumeBeforeInsert()
{ {
$this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 1, 'x' => 'foo']);
...@@ -197,27 +258,6 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -197,27 +258,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->next(); $changeStream->next();
} }
public function testConnectionException()
{
$client = new Client($this->getUri(), ['socketTimeoutMS' => 1005], []);
$collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName());
$changeStream = $collection->watch();
$changeStream->next();
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$changeStream->next();
$expectedResult = (object) ([
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.226d95f1'],
'documentKey' => (object) ['_id' => 1]
]);
$this->assertEquals($changeStream->current(), $expectedResult);
}
public function testMaxAwaitTimeMS() public function testMaxAwaitTimeMS()
{ {
/* On average, an acknowledged write takes about 20 ms to appear in a /* On average, an acknowledged write takes about 20 ms to appear in a
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment