Commit 99f54dd5 authored by Jeremy Mikola's avatar Jeremy Mikola

Refactor resume token extraction and invalid type exception

parent 9f41780d
...@@ -93,7 +93,7 @@ class ChangeStream implements Iterator ...@@ -93,7 +93,7 @@ class ChangeStream implements Iterator
try { try {
$this->csIt->next(); $this->csIt->next();
if ($this->valid()) { if ($this->valid()) {
$this->extractResumeToken($this->csIt->current()); $this->resumeToken = $this->extractResumeToken($this->csIt->current());
$this->key++; $this->key++;
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
...@@ -122,7 +122,7 @@ class ChangeStream implements Iterator ...@@ -122,7 +122,7 @@ class ChangeStream implements Iterator
try { try {
$this->csIt->rewind(); $this->csIt->rewind();
if ($this->valid()) { if ($this->valid()) {
$this->extractResumeToken($this->csIt->current()); $this->resumeToken = $this->extractResumeToken($this->csIt->current());
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) { if (strpos($e->getMessage(), "not master") !== false) {
...@@ -152,8 +152,10 @@ class ChangeStream implements Iterator ...@@ -152,8 +152,10 @@ class ChangeStream implements Iterator
/** /**
* Extracts the resume token (i.e. "_id" field) from the change document. * Extracts the resume token (i.e. "_id" field) from the change document.
* *
* @param array|document $document Change document
* @return mixed
* @throws InvalidArgumentException * @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token cannot be found (i.e. no _id field) * @throws ResumeTokenException if the resume token is not found or invalid
*/ */
private function extractResumeToken($document) private function extractResumeToken($document)
{ {
...@@ -161,22 +163,23 @@ class ChangeStream implements Iterator ...@@ -161,22 +163,23 @@ class ChangeStream implements Iterator
throw InvalidArgumentException::invalidType('$document', $document, 'array or object'); throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
} }
if (is_array($document) && isset($document['_id'])) { if ($document instanceof Serializable) {
$this->resumeToken = $document['_id']; return $this->extractResumeToken($document->bsonSerialize());
return;
} }
if ($document instanceof Serializable) { $resumeToken = is_array($document)
$this->extractResumeToken($document->bsonSerialize()); ? (isset($document['_id']) ? $document['_id'] : null)
return; : (isset($document->_id) ? $document->_id : null);
if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
} }
if (isset($document->_id)) { if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
$this->resumeToken = $document->_id; throw ResumeTokenException::invalidType($resumeToken);
return;
} }
throw new ResumeTokenException("Cannot provide resume functionality when the resume token is missing"); return $resumeToken;
} }
/** /**
......
...@@ -19,4 +19,24 @@ namespace MongoDB\Exception; ...@@ -19,4 +19,24 @@ namespace MongoDB\Exception;
class ResumeTokenException extends \Exception class ResumeTokenException extends \Exception
{ {
/**
* Thrown when a resume token is not found in a change document.
*
* @return self
*/
public static function notFound()
{
return new static('Resume token not found in change document');
}
/**
* Thrown when a resume token has an invalid type.
*
* @param mixed $value Actual value (used to derive the type)
* @return self
*/
public static function invalidType($value)
{
return new static(sprintf('Expected resume token to have type "array or object" but found "%s"', gettype($value)));
}
} }
...@@ -307,8 +307,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -307,8 +307,9 @@ class WatchFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\ResumeTokenException * @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Resume token not found in change document
*/ */
public function testNextCannotExtractResumeToken() public function testNextResumeTokenNotFound()
{ {
$pipeline = [['$project' => ['_id' => 0 ]]]; $pipeline = [['$project' => ['_id' => 0 ]]];
...@@ -324,8 +325,9 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -324,8 +325,9 @@ class WatchFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\ResumeTokenException * @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Resume token not found in change document
*/ */
public function testRewindCannotExtractResumeToken() public function testRewindResumeTokenNotFound()
{ {
$pipeline = [['$project' => ['_id' => 0 ]]]; $pipeline = [['$project' => ['_id' => 0 ]]];
...@@ -337,6 +339,40 @@ class WatchFunctionalTest extends FunctionalTestCase ...@@ -337,6 +339,40 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream->rewind(); $changeStream->rewind();
} }
/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
*/
public function testNextResumeTokenInvalidType()
{
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]);
$changeStream->next();
}
/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @expectedExceptionMessage Expected resume token to have type "array or object" but found "string"
*/
public function testRewindResumeTokenInvalidType()
{
$pipeline = [['$project' => ['_id' => ['$literal' => 'foo']]]];
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());
$this->insertDocument(['x' => 1]);
$changeStream->rewind();
}
public function testMaxAwaitTimeMS() public function testMaxAwaitTimeMS()
{ {
/* On average, an acknowledged write takes about 20 ms to appear in a /* On average, an acknowledged write takes about 20 ms to appear in a
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment