Commit bac5b9bc authored by Katherine Walker's avatar Katherine Walker

Merge pull request #468

parents 76ef2717 bff5e2e2
...@@ -22,9 +22,15 @@ interface: phpmethod ...@@ -22,9 +22,15 @@ interface: phpmethod
operation: ~ operation: ~
optional: true optional: true
--- ---
source: arg_name: option
file: apiargs-MongoDBCollection-method-find-option.yaml name: maxAwaitTimeMS
ref: maxAwaitTimeMS type: integer
description: |
Positive integer denoting the time limit in milliseconds for the server to
block a getMore operation if no data is available.
interface: phpmethod
operation: ~
optional: true
--- ---
source: source:
file: apiargs-MongoDBCollection-common-option.yaml file: apiargs-MongoDBCollection-common-option.yaml
......
...@@ -177,7 +177,6 @@ using :php:`IteratorIterator <iteratoriterator>`. ...@@ -177,7 +177,6 @@ using :php:`IteratorIterator <iteratoriterator>`.
$iterator->next(); $iterator->next();
} }
}
Much like the ``foreach`` example, this version on the consumer script will Much like the ``foreach`` example, this version on the consumer script will
start by quickly printing all results in the capped collection; however, it will start by quickly printing all results in the capped collection; however, it will
......
...@@ -163,6 +163,10 @@ class Aggregate implements Executable ...@@ -163,6 +163,10 @@ class Aggregate implements Executable
throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object'); throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object');
} }
if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) {
throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer');
}
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) { if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer'); throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
} }
...@@ -277,6 +281,7 @@ class Aggregate implements Executable ...@@ -277,6 +281,7 @@ class Aggregate implements Executable
'aggregate' => $this->collectionName, 'aggregate' => $this->collectionName,
'pipeline' => $this->pipeline, 'pipeline' => $this->pipeline,
]; ];
$cmdOptions = [];
// Servers < 2.6 do not support any command options // Servers < 2.6 do not support any command options
if ( ! $isCursorSupported) { if ( ! $isCursorSupported) {
...@@ -303,13 +308,17 @@ class Aggregate implements Executable ...@@ -303,13 +308,17 @@ class Aggregate implements Executable
$cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint']; $cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint'];
} }
if (isset($this->options['maxAwaitTimeMS'])) {
$cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS'];
}
if ($this->options['useCursor']) { if ($this->options['useCursor']) {
$cmd['cursor'] = isset($this->options["batchSize"]) $cmd['cursor'] = isset($this->options["batchSize"])
? ['batchSize' => $this->options["batchSize"]] ? ['batchSize' => $this->options["batchSize"]]
: new stdClass; : new stdClass;
} }
return new Command($cmd); return new Command($cmd, $cmdOptions);
} }
/** /**
......
...@@ -147,7 +147,7 @@ class ChangeStream implements Executable ...@@ -147,7 +147,7 @@ class ChangeStream implements Executable
private function createAggregateOptions() private function createAggregateOptions()
{ {
$aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1]); $aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1]);
if ( ! $aggOptions) { if ( ! $aggOptions) {
return []; return [];
} }
...@@ -174,6 +174,7 @@ class ChangeStream implements Executable ...@@ -174,6 +174,7 @@ class ChangeStream implements Executable
array_unshift($this->pipeline, $changeStreamArray); array_unshift($this->pipeline, $changeStreamArray);
$cmd = new Aggregate($this->databaseName, $this->collectionName, $this->pipeline, $this->createAggregateOptions()); $cmd = new Aggregate($this->databaseName, $this->collectionName, $this->pipeline, $this->createAggregateOptions());
return $cmd; return $cmd;
} }
......
...@@ -237,4 +237,49 @@ class ChangeStreamFunctionalTest extends FunctionalTestCase ...@@ -237,4 +237,49 @@ class ChangeStreamFunctionalTest extends FunctionalTestCase
]); ]);
$this->assertEquals($changeStreamResult->current(), $expectedResult); $this->assertEquals($changeStreamResult->current(), $expectedResult);
} }
public function testMaxAwaitTimeMS()
{
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
/* On average, an acknowledged write takes about 20 ms to appear in a
* change stream on the server so we'll use a higher maxAwaitTimeMS to
* ensure we see the write. */
$maxAwaitTimeMS = 100;
$changeStreamResult = $this->collection->watch([], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
/* The initial change stream is empty so we should expect a delay when
* we call rewind, since it issues a getMore. Expect to wait at least
* maxAwaitTimeMS, since no new documents should be inserted to wake up
* the server's query thread. Also ensure we don't wait too long (server
* default is one second). */
$startTime = microtime(true);
$changeStreamResult->rewind();
$duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($changeStreamResult->valid());
/* Advancing again on a change stream will issue a getMore, so we should
* expect a delay again. */
$startTime = microtime(true);
$changeStreamResult->next();
$duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($changeStreamResult->valid());
/* After inserting a document, the change stream will not issue a
* getMore so we should not expect a delay. */
$result = $this->collection->insertOne(['_id' => 1]);
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
$this->assertSame(1, $result->getInsertedCount());
$startTime = microtime(true);
$changeStreamResult->next();
$duration = microtime(true) - $startTime;
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
$this->assertTrue($changeStreamResult->valid());
}
} }
...@@ -148,18 +148,45 @@ class FindFunctionalTest extends FunctionalTestCase ...@@ -148,18 +148,45 @@ class FindFunctionalTest extends FunctionalTestCase
// Insert documents into the capped collection. // Insert documents into the capped collection.
$bulkWrite = new BulkWrite(['ordered' => true]); $bulkWrite = new BulkWrite(['ordered' => true]);
$bulkWrite->insert(['_id' => 1]); $bulkWrite->insert(['_id' => 1]);
$bulkWrite->insert(['_id' => 2]);
$result = $this->manager->executeBulkWrite($this->getNamespace(), $bulkWrite); $result = $this->manager->executeBulkWrite($this->getNamespace(), $bulkWrite);
$operation = new Find($databaseName, $cappedCollectionName, [], ['cursorType' => Find::TAILABLE_AWAIT, 'maxAwaitTimeMS' => $maxAwaitTimeMS]); $operation = new Find($databaseName, $cappedCollectionName, [], ['cursorType' => Find::TAILABLE_AWAIT, 'maxAwaitTimeMS' => $maxAwaitTimeMS]);
$cursor = $operation->execute($this->getPrimaryServer()); $cursor = $operation->execute($this->getPrimaryServer());
$it = new \IteratorIterator($cursor); $it = new \IteratorIterator($cursor);
// Make sure we await results for no more than the maxAwaitTimeMS. /* The initial query includes the one and only document in its result
* batch, so we should not expect a delay. */
$startTime = microtime(true);
$it->rewind(); $it->rewind();
$duration = microtime(true) - $startTime;
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
$this->assertTrue($it->valid());
$this->assertSameDocument(['_id' => 1], $it->current());
/* Advancing again takes us to the last document of the result batch,
* but still should not issue a getMore */
$startTime = microtime(true);
$it->next(); $it->next();
$duration = microtime(true) - $startTime;
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
$this->assertTrue($it->valid());
$this->assertSameDocument(['_id' => 2], $it->current());
/* Now that we've reached the end of the initial result batch, advancing
* again will issue a getMore. Expect to wait at least maxAwaitTimeMS,
* since no new documents should be inserted to wake up the server's
* query thread. Also ensure we don't wait too long (server default is
* one second). */
$startTime = microtime(true); $startTime = microtime(true);
$it->next(); $it->next();
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, microtime(true) - $startTime); $duration = microtime(true) - $startTime;
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
$this->assertLessThan(0.5, $duration);
$this->assertFalse($it->valid());
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment