Commit 43e742ad authored by Katherine Walker's avatar Katherine Walker

PHPLIB-276: Add maxAwaitTimeMS support for change streams

parent 76ef2717
...@@ -163,6 +163,10 @@ class Aggregate implements Executable ...@@ -163,6 +163,10 @@ class Aggregate implements Executable
throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object'); throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object');
} }
if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) {
throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer');
}
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) { if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer'); throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
} }
...@@ -277,6 +281,7 @@ class Aggregate implements Executable ...@@ -277,6 +281,7 @@ class Aggregate implements Executable
'aggregate' => $this->collectionName, 'aggregate' => $this->collectionName,
'pipeline' => $this->pipeline, 'pipeline' => $this->pipeline,
]; ];
$cmdOptions = [];
// Servers < 2.6 do not support any command options // Servers < 2.6 do not support any command options
if ( ! $isCursorSupported) { if ( ! $isCursorSupported) {
...@@ -303,13 +308,17 @@ class Aggregate implements Executable ...@@ -303,13 +308,17 @@ class Aggregate implements Executable
$cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint']; $cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint'];
} }
if (isset($this->options['maxAwaitTimeMS'])) {
$cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS'];
}
if ($this->options['useCursor']) { if ($this->options['useCursor']) {
$cmd['cursor'] = isset($this->options["batchSize"]) $cmd['cursor'] = isset($this->options["batchSize"])
? ['batchSize' => $this->options["batchSize"]] ? ['batchSize' => $this->options["batchSize"]]
: new stdClass; : new stdClass;
} }
return new Command($cmd); return new Command($cmd, $cmdOptions);
} }
/** /**
......
...@@ -147,7 +147,7 @@ class ChangeStream implements Executable ...@@ -147,7 +147,7 @@ class ChangeStream implements Executable
private function createAggregateOptions() private function createAggregateOptions()
{ {
$aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1]); $aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1]);
if ( ! $aggOptions) { if ( ! $aggOptions) {
return []; return [];
} }
...@@ -174,6 +174,7 @@ class ChangeStream implements Executable ...@@ -174,6 +174,7 @@ class ChangeStream implements Executable
array_unshift($this->pipeline, $changeStreamArray); array_unshift($this->pipeline, $changeStreamArray);
$cmd = new Aggregate($this->databaseName, $this->collectionName, $this->pipeline, $this->createAggregateOptions()); $cmd = new Aggregate($this->databaseName, $this->collectionName, $this->pipeline, $this->createAggregateOptions());
return $cmd; return $cmd;
} }
......
...@@ -237,4 +237,21 @@ class ChangeStreamFunctionalTest extends FunctionalTestCase ...@@ -237,4 +237,21 @@ class ChangeStreamFunctionalTest extends FunctionalTestCase
]); ]);
$this->assertEquals($changeStreamResult->current(), $expectedResult); $this->assertEquals($changeStreamResult->current(), $expectedResult);
} }
public function testMaxAwaitTimeMS()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
$maxAwaitTimeMS = 10;
$changeStreamResult = $this->collection->watch([], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
/* Make sure we await results for at least maxAwaitTimeMS, since no new
* documents should be inserted to wake up the server's command thread.
* Also ensure that we don't wait too long (server default is one
* second). */
$startTime = microtime(true);
$changeStreamResult->rewind();
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, microtime(true) - $startTime);
$this->assertLessThan(0.5, microtime(true) - $startTime);
}
} }
...@@ -154,7 +154,10 @@ class FindFunctionalTest extends FunctionalTestCase ...@@ -154,7 +154,10 @@ class FindFunctionalTest extends FunctionalTestCase
$cursor = $operation->execute($this->getPrimaryServer()); $cursor = $operation->execute($this->getPrimaryServer());
$it = new \IteratorIterator($cursor); $it = new \IteratorIterator($cursor);
// Make sure we await results for no more than the maxAwaitTimeMS. /* Make sure we await results for at least maxAwaitTimeMS, since no new
* documents should be inserted to wake up the server's query thread.
* Also ensure that we don't wait too long (server default is one
* second). */
$it->rewind(); $it->rewind();
$it->next(); $it->next();
$startTime = microtime(true); $startTime = microtime(true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment