Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
M
mongo-php-library
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sinan
mongo-php-library
Commits
33fc5e82
Unverified
Commit
33fc5e82
authored
Mar 24, 2020
by
Andreas Braun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
PHPLIB-537: Use whitelist to check if a change stream is resumable
parent
9fa37e73
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
3401 additions
and
745 deletions
+3401
-745
ChangeStream.php
src/ChangeStream.php
+26
-12
ChangeStreamIterator.php
src/Model/ChangeStreamIterator.php
+13
-0
WatchFunctionalTest.php
tests/Operation/WatchFunctionalTest.php
+0
-78
ChangeStreamsSpecTest.php
tests/SpecTests/ChangeStreamsSpecTest.php
+18
-3
change-streams-errors.json
tests/SpecTests/change-streams/change-streams-errors.json
+49
-7
change-streams-errors.yml
tests/SpecTests/change-streams/change-streams-errors.yml
+0
-76
change-streams-resume-errorLabels.json
...sts/change-streams/change-streams-resume-errorLabels.json
+1634
-0
change-streams-resume-whitelist.json
...Tests/change-streams/change-streams-resume-whitelist.json
+1653
-0
change-streams.json
tests/SpecTests/change-streams/change-streams.json
+8
-25
change-streams.yml
tests/SpecTests/change-streams/change-streams.yml
+0
-544
No files found.
src/ChangeStream.php
View file @
33fc5e82
...
@@ -42,13 +42,31 @@ class ChangeStream implements Iterator
...
@@ -42,13 +42,31 @@ class ChangeStream implements Iterator
*/
*/
const
CURSOR_NOT_FOUND
=
43
;
const
CURSOR_NOT_FOUND
=
43
;
/** @var array */
/** @var int[] */
private
static
$nonResumableErrorCodes
=
[
private
static
$resumableErrorCodes
=
[
136
,
// CappedPositionLost
6
,
// HostUnreachable
237
,
// CursorKilled
7
,
// HostNotFound
11601
,
// Interrupted
89
,
// NetworkTimeout
91
,
// ShutdownInProgress
189
,
// PrimarySteppedDown
262
,
// ExceededTimeLimit
9001
,
// SocketException
10107
,
// NotMaster
11600
,
// InterruptedAtShutdown
11602
,
// InterruptedDueToReplStateChange
13435
,
// NotMasterNoSlaveOk
13436
,
// NotMasterOrSecondary
63
,
// StaleShardVersion
150
,
// StaleEpoch
13388
,
// StaleConfig
234
,
// RetryChangeStream
133
,
// FailedToSatisfyReadPreference
216
,
// ElectionInProgress
];
];
/** @var int */
private
static
$wireVersionForResumableChangeStreamError
=
9
;
/** @var callable */
/** @var callable */
private
$resumeCallable
;
private
$resumeCallable
;
...
@@ -180,15 +198,11 @@ class ChangeStream implements Iterator
...
@@ -180,15 +198,11 @@ class ChangeStream implements Iterator
return
false
;
return
false
;
}
}
if
(
$exception
->
hasErrorLabel
(
'NonResumableChangeStreamError'
))
{
if
(
server_supports_feature
(
$this
->
iterator
->
getServer
(),
self
::
$wireVersionForResumableChangeStreamError
))
{
return
false
;
return
$exception
->
hasErrorLabel
(
'ResumableChangeStreamError'
);
}
if
(
in_array
(
$exception
->
getCode
(),
self
::
$nonResumableErrorCodes
))
{
return
false
;
}
}
return
true
;
return
in_array
(
$exception
->
getCode
(),
self
::
$resumableErrorCodes
)
;
}
}
/**
/**
...
...
src/Model/ChangeStreamIterator.php
View file @
33fc5e82
...
@@ -24,6 +24,7 @@ use MongoDB\Driver\Monitoring\CommandFailedEvent;
...
@@ -24,6 +24,7 @@ use MongoDB\Driver\Monitoring\CommandFailedEvent;
use
MongoDB\Driver\Monitoring\CommandStartedEvent
;
use
MongoDB\Driver\Monitoring\CommandStartedEvent
;
use
MongoDB\Driver\Monitoring\CommandSubscriber
;
use
MongoDB\Driver\Monitoring\CommandSubscriber
;
use
MongoDB\Driver\Monitoring\CommandSucceededEvent
;
use
MongoDB\Driver\Monitoring\CommandSucceededEvent
;
use
MongoDB\Driver\Server
;
use
MongoDB\Exception\InvalidArgumentException
;
use
MongoDB\Exception\InvalidArgumentException
;
use
MongoDB\Exception\ResumeTokenException
;
use
MongoDB\Exception\ResumeTokenException
;
use
MongoDB\Exception\UnexpectedValueException
;
use
MongoDB\Exception\UnexpectedValueException
;
...
@@ -63,6 +64,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
...
@@ -63,6 +64,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
/** @var array|object|null */
/** @var array|object|null */
private
$resumeToken
;
private
$resumeToken
;
/** @var Server */
private
$server
;
/**
/**
* @internal
* @internal
* @param Cursor $cursor
* @param Cursor $cursor
...
@@ -90,6 +94,7 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
...
@@ -90,6 +94,7 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
$this
->
isRewindNop
=
(
$firstBatchSize
===
0
);
$this
->
isRewindNop
=
(
$firstBatchSize
===
0
);
$this
->
postBatchResumeToken
=
$postBatchResumeToken
;
$this
->
postBatchResumeToken
=
$postBatchResumeToken
;
$this
->
resumeToken
=
$initialResumeToken
;
$this
->
resumeToken
=
$initialResumeToken
;
$this
->
server
=
$cursor
->
getServer
();
}
}
/** @internal */
/** @internal */
...
@@ -152,6 +157,14 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
...
@@ -152,6 +157,14 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
return
$this
->
resumeToken
;
return
$this
->
resumeToken
;
}
}
/**
* Returns the server the cursor is running on.
*/
public
function
getServer
()
:
Server
{
return
$this
->
server
;
}
/**
/**
* @see https://php.net/iteratoriterator.key
* @see https://php.net/iteratoriterator.key
* @return mixed
* @return mixed
...
...
tests/Operation/WatchFunctionalTest.php
View file @
33fc5e82
...
@@ -155,49 +155,6 @@ class WatchFunctionalTest extends FunctionalTestCase
...
@@ -155,49 +155,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
assertSameDocument
(
$postBatchResumeToken
,
$changeStream
->
getResumeToken
());
$this
->
assertSameDocument
(
$postBatchResumeToken
,
$changeStream
->
getResumeToken
());
}
}
/**
* Prose test 10: "ChangeStream will resume after a killCursors command is
* issued for its child cursor."
*/
public
function
testNextResumesAfterCursorNotFound
()
{
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$this
->
defaultOptions
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
$changeStream
->
rewind
();
$this
->
assertFalse
(
$changeStream
->
valid
());
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
'operationType'
=>
'insert'
,
'fullDocument'
=>
[
'_id'
=>
1
,
'x'
=>
'foo'
],
'ns'
=>
[
'db'
=>
$this
->
getDatabaseName
(),
'coll'
=>
$this
->
getCollectionName
()],
'documentKey'
=>
[
'_id'
=>
1
],
];
$this
->
assertMatchesDocument
(
$expectedResult
,
$changeStream
->
current
());
$this
->
killChangeStreamCursor
(
$changeStream
);
$this
->
insertDocument
([
'_id'
=>
2
,
'x'
=>
'bar'
]);
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
'operationType'
=>
'insert'
,
'fullDocument'
=>
[
'_id'
=>
2
,
'x'
=>
'bar'
],
'ns'
=>
[
'db'
=>
$this
->
getDatabaseName
(),
'coll'
=>
$this
->
getCollectionName
()],
'documentKey'
=>
[
'_id'
=>
2
],
];
$this
->
assertMatchesDocument
(
$expectedResult
,
$changeStream
->
current
());
}
public
function
testNextResumesAfterConnectionException
()
public
function
testNextResumesAfterConnectionException
()
{
{
/* In order to trigger a dropped connection, we'll use a new client with
/* In order to trigger a dropped connection, we'll use a new client with
...
@@ -731,41 +688,6 @@ class WatchFunctionalTest extends FunctionalTestCase
...
@@ -731,41 +688,6 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
assertFalse
(
$cursor
->
isDead
());
$this
->
assertFalse
(
$cursor
->
isDead
());
}
}
/**
* Prose test 5: "ChangeStream will not attempt to resume after encountering
* error code 11601 (Interrupted), 136 (CappedPositionLost), or 237
* (CursorKilled) while executing a getMore command."
*
* @dataProvider provideNonResumableErrorCodes
*/
public
function
testNonResumableErrorCodes
(
$errorCode
)
{
$this
->
configureFailPoint
([
'configureFailPoint'
=>
'failCommand'
,
'mode'
=>
[
'times'
=>
1
],
'data'
=>
[
'failCommands'
=>
[
'getMore'
],
'errorCode'
=>
$errorCode
],
]);
$this
->
insertDocument
([
'x'
=>
1
]);
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[]);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
$changeStream
->
rewind
();
$this
->
expectException
(
ServerException
::
class
);
$this
->
expectExceptionCode
(
$errorCode
);
$changeStream
->
next
();
}
public
function
provideNonResumableErrorCodes
()
{
return
[
'CappedPositionLost'
=>
[
136
],
'CursorKilled'
=>
[
237
],
'Interrupted'
=>
[
11601
],
];
}
/**
/**
* Prose test 2: "ChangeStream will throw an exception if the server
* Prose test 2: "ChangeStream will throw an exception if the server
* response is missing the resume token (if wire version is < 8, this is a
* response is missing the resume token (if wire version is < 8, this is a
...
...
tests/SpecTests/ChangeStreamsSpecTest.php
View file @
33fc5e82
...
@@ -4,6 +4,7 @@ namespace MongoDB\Tests\SpecTests;
...
@@ -4,6 +4,7 @@ namespace MongoDB\Tests\SpecTests;
use
ArrayIterator
;
use
ArrayIterator
;
use
LogicException
;
use
LogicException
;
use
MongoDB\BSON\Int64
;
use
MongoDB\ChangeStream
;
use
MongoDB\ChangeStream
;
use
MongoDB\Driver\Exception\Exception
;
use
MongoDB\Driver\Exception\Exception
;
use
MongoDB\Model\BSONDocument
;
use
MongoDB\Model\BSONDocument
;
...
@@ -22,16 +23,27 @@ use function glob;
...
@@ -22,16 +23,27 @@ use function glob;
class
ChangeStreamsSpecTest
extends
FunctionalTestCase
class
ChangeStreamsSpecTest
extends
FunctionalTestCase
{
{
/** @var array */
/** @var array */
private
static
$incompleteTests
=
[
'change-streams-errors: Change Stream should error when _id is projected out'
=>
'PHPC-1419'
];
private
static
$incompleteTests
=
[];
/**
/**
* Assert that the expected and actual command documents match.
* Assert that the expected and actual command documents match.
*
*
* Note: this method may modify the $expected object.
*
* @param stdClass $expected Expected command document
* @param stdClass $expected Expected command document
* @param stdClass $actual Actual command document
* @param stdClass $actual Actual command document
*/
*/
public
static
function
assertCommandMatches
(
stdClass
$expected
,
stdClass
$actual
)
public
static
function
assertCommandMatches
(
stdClass
$expected
,
stdClass
$actual
)
{
{
if
(
isset
(
$expected
->
getMore
)
&&
$expected
->
getMore
===
42
)
{
static
::
assertObjectHasAttribute
(
'getMore'
,
$actual
);
static
::
assertThat
(
$actual
->
getMore
,
static
::
logicalOr
(
static
::
isInstanceOf
(
Int64
::
class
),
static
::
isType
(
'integer'
)
));
unset
(
$expected
->
getMore
);
}
static
::
assertDocumentsMatch
(
$expected
,
$actual
);
static
::
assertDocumentsMatch
(
$expected
,
$actual
);
}
}
...
@@ -76,7 +88,7 @@ class ChangeStreamsSpecTest extends FunctionalTestCase
...
@@ -76,7 +88,7 @@ class ChangeStreamsSpecTest extends FunctionalTestCase
$this
->
checkServerRequirements
(
$this
->
createRunOn
(
$test
));
$this
->
checkServerRequirements
(
$this
->
createRunOn
(
$test
));
if
(
!
isset
(
$databaseName
,
$collectionName
,
$database2Name
,
$collection2Name
))
{
if
(
!
isset
(
$databaseName
,
$collectionName
))
{
$this
->
fail
(
'Required database and collection names are unset'
);
$this
->
fail
(
'Required database and collection names are unset'
);
}
}
...
@@ -84,7 +96,10 @@ class ChangeStreamsSpecTest extends FunctionalTestCase
...
@@ -84,7 +96,10 @@ class ChangeStreamsSpecTest extends FunctionalTestCase
$this
->
setContext
(
$context
);
$this
->
setContext
(
$context
);
$this
->
dropDatabasesAndCreateCollection
(
$databaseName
,
$collectionName
);
$this
->
dropDatabasesAndCreateCollection
(
$databaseName
,
$collectionName
);
$this
->
dropDatabasesAndCreateCollection
(
$database2Name
,
$collection2Name
);
if
(
isset
(
$database2Name
,
$collection2Name
))
{
$this
->
dropDatabasesAndCreateCollection
(
$database2Name
,
$collection2Name
);
}
if
(
isset
(
$test
->
failPoint
))
{
if
(
isset
(
$test
->
failPoint
))
{
$this
->
configureFailPoint
(
$test
->
failPoint
);
$this
->
configureFailPoint
(
$test
->
failPoint
);
...
...
tests/SpecTests/change-streams/change-streams-errors.json
View file @
33fc5e82
...
@@ -54,9 +54,7 @@
...
@@ -54,9 +54,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
},
},
{
{
"$unsupported"
:
"foo"
"$unsupported"
:
"foo"
...
@@ -104,10 +102,54 @@
...
@@ -104,10 +102,54 @@
],
],
"result"
:
{
"result"
:
{
"error"
:
{
"error"
:
{
"code"
:
280
,
"code"
:
280
"errorLabels"
:
[
}
"NonResumableChangeStreamError"
}
]
},
{
"description"
:
"change stream errors on MaxTimeMSExpired"
,
"minServerVersion"
:
"4.2"
,
"failPoint"
:
{
"configureFailPoint"
:
"failCommand"
,
"mode"
:
{
"times"
:
1
},
"data"
:
{
"failCommands"
:
[
"getMore"
],
"errorCode"
:
50
,
"closeConnection"
:
false
}
},
"target"
:
"collection"
,
"topology"
:
[
"replicaset"
,
"sharded"
],
"changeStreamPipeline"
:
[
{
"$project"
:
{
"_id"
:
0
}
}
],
"changeStreamOptions"
:
{},
"operations"
:
[
{
"database"
:
"change-stream-tests"
,
"collection"
:
"test"
,
"name"
:
"insertOne"
,
"arguments"
:
{
"document"
:
{
"z"
:
3
}
}
}
],
"result"
:
{
"error"
:
{
"code"
:
50
}
}
}
}
}
}
...
...
tests/SpecTests/change-streams/change-streams-errors.yml
deleted
100644 → 0
View file @
9fa37e73
collection_name
:
&collection_name
"
test"
database_name
:
&database_name
"
change-stream-tests"
collection2_name
:
&collection2_name
"
test2"
database2_name
:
&database2_name
"
change-stream-tests-2"
tests
:
-
description
:
The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error
minServerVersion
:
"
3.6.0"
target
:
collection
topology
:
-
single
changeStreamPipeline
:
[]
changeStreamOptions
:
{}
operations
:
[]
expectations
:
[]
result
:
error
:
code
:
40573
-
description
:
Change Stream should error when an invalid aggregation stage is passed in
minServerVersion
:
"
3.6.0"
target
:
collection
topology
:
-
replicaset
changeStreamPipeline
:
-
$unsupported
:
foo
changeStreamOptions
:
{}
operations
:
-
database
:
*database_name
collection
:
*collection_name
name
:
insertOne
arguments
:
document
:
z
:
3
expectations
:
-
command_started_event
:
command
:
aggregate
:
*collection_name
cursor
:
{}
pipeline
:
-
$changeStream
:
fullDocument
:
default
-
$unsupported
:
foo
command_name
:
aggregate
database_name
:
*database_name
result
:
error
:
code
:
40324
-
description
:
Change Stream should error when _id is projected out
minServerVersion
:
"
4.1.11"
target
:
collection
topology
:
-
replicaset
-
sharded
changeStreamPipeline
:
-
$project
:
{
_id
:
0
}
changeStreamOptions
:
{}
operations
:
-
database
:
*database_name
collection
:
*collection_name
name
:
insertOne
arguments
:
document
:
z
:
3
result
:
error
:
code
:
280
errorLabels
:
[
"
NonResumableChangeStreamError"
]
tests/SpecTests/change-streams/change-streams-resume-errorLabels.json
0 → 100644
View file @
33fc5e82
This diff is collapsed.
Click to expand it.
tests/SpecTests/change-streams/change-streams-resume-whitelist.json
0 → 100644
View file @
33fc5e82
This diff is collapsed.
Click to expand it.
tests/SpecTests/change-streams/change-streams.json
View file @
33fc5e82
...
@@ -33,9 +33,7 @@
...
@@ -33,9 +33,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
@@ -153,9 +151,7 @@
...
@@ -153,9 +151,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
@@ -226,9 +222,7 @@
...
@@ -226,9 +222,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
},
},
{
{
"$match"
:
{
"$match"
:
{
...
@@ -312,9 +306,7 @@
...
@@ -312,9 +306,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
@@ -404,7 +396,6 @@
...
@@ -404,7 +396,6 @@
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{
"fullDocument"
:
"default"
,
"allChangesForCluster"
:
true
"allChangesForCluster"
:
true
}
}
}
}
...
@@ -523,9 +514,7 @@
...
@@ -523,9 +514,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
@@ -611,9 +600,7 @@
...
@@ -611,9 +600,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
@@ -665,9 +652,7 @@
...
@@ -665,9 +652,7 @@
"cursor"
:
{},
"cursor"
:
{},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
@@ -756,9 +741,7 @@
...
@@ -756,9 +741,7 @@
},
},
"pipeline"
:
[
"pipeline"
:
[
{
{
"$changeStream"
:
{
"$changeStream"
:
{}
"fullDocument"
:
"default"
}
}
}
]
]
},
},
...
...
tests/SpecTests/change-streams/change-streams.yml
deleted
100644 → 0
View file @
9fa37e73
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment