Commit 2a288a03 authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-351: Update logic for identifying resumable errors

Skip redundant error message checks, since their codes do not overlap with the three errors specifically excluded by the spec and all other server errors are resumable.

Deprecate the unused class constant, which can be removed in 2.0 (PHPLIB-360).
parent e621613d
...@@ -19,8 +19,9 @@ namespace MongoDB; ...@@ -19,8 +19,9 @@ namespace MongoDB;
use MongoDB\BSON\Serializable; use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\ConnectionTimeoutException; use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\RuntimeException; use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException; use MongoDB\Exception\ResumeTokenException;
use IteratorIterator; use IteratorIterator;
...@@ -35,14 +36,22 @@ use Iterator; ...@@ -35,14 +36,22 @@ use Iterator;
*/ */
class ChangeStream implements Iterator class ChangeStream implements Iterator
{ {
/**
* @deprecated 1.4
* @todo Remove this in 2.0 (see: PHPLIB-360)
*/
const CURSOR_NOT_FOUND = 43;
private static $errorCodeCappedPositionLost = 136;
private static $errorCodeInterrupted = 11601;
private static $errorCodeCursorKilled = 237;
private $resumeToken; private $resumeToken;
private $resumeCallable; private $resumeCallable;
private $csIt; private $csIt;
private $key = 0; private $key = 0;
private $hasAdvanced = false; private $hasAdvanced = false;
const CURSOR_NOT_FOUND = 43;
/** /**
* Constructor. * Constructor.
* *
...@@ -91,7 +100,6 @@ class ChangeStream implements Iterator ...@@ -91,7 +100,6 @@ class ChangeStream implements Iterator
*/ */
public function next() public function next()
{ {
$resumable = false;
try { try {
$this->csIt->next(); $this->csIt->next();
if ($this->valid()) { if ($this->valid()) {
...@@ -111,18 +119,9 @@ class ChangeStream implements Iterator ...@@ -111,18 +119,9 @@ class ChangeStream implements Iterator
$this->resumeCallable = null; $this->resumeCallable = null;
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) { if ($this->isResumableError($e)) {
$resumable = true; $this->resume();
} }
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
}
}
if ($resumable) {
$this->resume();
} }
} }
...@@ -132,7 +131,6 @@ class ChangeStream implements Iterator ...@@ -132,7 +131,6 @@ class ChangeStream implements Iterator
*/ */
public function rewind() public function rewind()
{ {
$resumable = false;
try { try {
$this->csIt->rewind(); $this->csIt->rewind();
if ($this->valid()) { if ($this->valid()) {
...@@ -144,18 +142,9 @@ class ChangeStream implements Iterator ...@@ -144,18 +142,9 @@ class ChangeStream implements Iterator
$this->resumeCallable = null; $this->resumeCallable = null;
} }
} catch (RuntimeException $e) { } catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) { if ($this->isResumableError($e)) {
$resumable = true; $this->resume();
} }
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
}
}
if ($resumable) {
$this->resume();
} }
} }
...@@ -201,6 +190,30 @@ class ChangeStream implements Iterator ...@@ -201,6 +190,30 @@ class ChangeStream implements Iterator
return $resumeToken; return $resumeToken;
} }
/**
* Determines if an exception is a resumable error.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
* @param RuntimeException $exception
* @return boolean
*/
private function isResumableError(RuntimeException $exception)
{
if ($exception instanceof ConnectionException) {
return true;
}
if ( ! $exception instanceof ServerException) {
return false;
}
if (in_array($exception->getCode(), [self::$errorCodeCappedPositionLost, self::$errorCodeCursorKilled, self::$errorCodeInterrupted])) {
return false;
}
return true;
}
/** /**
* Creates a new changeStream after a resumable server error. * Creates a new changeStream after a resumable server error.
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment