Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
M
mongo-php-library
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sinan
mongo-php-library
Commits
91ecd2c8
Unverified
Commit
91ecd2c8
authored
Aug 02, 2019
by
Andreas Braun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
PHPLIB-461: Fix ChangeStream tests on sharded clusters
parent
5c635438
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
127 additions
and
66 deletions
+127
-66
DocumentationExamplesTest.php
tests/DocumentationExamplesTest.php
+4
-0
FunctionalTestCase.php
tests/FunctionalTestCase.php
+0
-3
WatchFunctionalTest.php
tests/Operation/WatchFunctionalTest.php
+123
-63
No files found.
tests/DocumentationExamplesTest.php
View file @
91ecd2c8
...
...
@@ -935,6 +935,10 @@ class DocumentationExamplesTest extends FunctionalTestCase
{
$this
->
skipIfChangeStreamIsNotSupported
();
if
(
$this
->
isShardedCluster
())
{
$this
->
markTestSkipped
(
'Test does not apply on sharded clusters: need more than a single getMore call on the change stream.'
);
}
$db
=
new
Database
(
$this
->
manager
,
$this
->
getDatabaseName
());
$db
->
dropCollection
(
'inventory'
);
$db
->
createCollection
(
'inventory'
);
...
...
tests/FunctionalTestCase.php
View file @
91ecd2c8
...
...
@@ -336,9 +336,6 @@ abstract class FunctionalTestCase extends TestCase
if
(
!
$this
->
isShardedClusterUsingReplicasets
())
{
$this
->
markTestSkipped
(
'$changeStream is only supported with replicasets'
);
}
// Temporarily skip tests because of an issue with change streams in the driver
$this
->
markTestSkipped
(
'$changeStreams currently don\'t on replica sets'
);
break
;
case
Server
::
TYPE_RS_PRIMARY
:
...
...
tests/Operation/WatchFunctionalTest.php
View file @
91ecd2c8
...
...
@@ -3,6 +3,7 @@
namespace
MongoDB\Tests\Operation
;
use
Closure
;
use
Iterator
;
use
MongoDB\BSON\TimestampInterface
;
use
MongoDB\ChangeStream
;
use
MongoDB\Driver\Cursor
;
...
...
@@ -19,6 +20,7 @@ use MongoDB\Operation\DatabaseCommand;
use
MongoDB\Operation\InsertOne
;
use
MongoDB\Operation\Watch
;
use
MongoDB\Tests\CommandObserver
;
use
PHPUnit\Framework\ExpectationFailedException
;
use
ReflectionClass
;
use
stdClass
;
use
Symfony\Bridge\PhpUnit\SetUpTearDownTrait
;
...
...
@@ -71,8 +73,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$this
->
insertDocument
([
'x'
=>
2
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertSameDocument
(
$changeStream
->
current
()
->
_id
,
$changeStream
->
getResumeToken
());
$changeStream
->
next
();
...
...
@@ -81,8 +82,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
3
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertSameDocument
(
$changeStream
->
current
()
->
_id
,
$changeStream
->
getResumeToken
());
}
...
...
@@ -120,23 +120,21 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$this
->
insertDocument
([
'x'
=>
2
]);
$
events
=
[]
;
$
lastEvent
=
null
;
(
new
CommandObserver
())
->
observe
(
function
()
use
(
$changeStream
)
{
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
},
function
(
array
$event
)
use
(
&
$
events
)
{
$
events
[]
=
$event
;
function
(
array
$event
)
use
(
&
$
lastEvent
)
{
$
lastEvent
=
$event
;
}
);
$this
->
assert
Count
(
1
,
$events
);
$this
->
assertSame
(
'getMore'
,
$
events
[
0
]
[
'started'
]
->
getCommandName
());
$postBatchResumeToken
=
$this
->
getPostBatchResumeTokenFromReply
(
$
events
[
0
]
[
'succeeded'
]
->
getReply
());
$this
->
assert
NotNull
(
$lastEvent
);
$this
->
assertSame
(
'getMore'
,
$
lastEvent
[
'started'
]
->
getCommandName
());
$postBatchResumeToken
=
$this
->
getPostBatchResumeTokenFromReply
(
$
lastEvent
[
'succeeded'
]
->
getReply
());
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
assertSameDocument
(
$changeStream
->
current
()
->
_id
,
$changeStream
->
getResumeToken
());
$changeStream
->
next
();
...
...
@@ -157,8 +155,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -174,8 +171,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
2
,
'x'
=>
'bar'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -381,6 +377,8 @@ class WatchFunctionalTest extends FunctionalTestCase
public
function
testRewindMultipleTimesWithResults
()
{
$this
->
skipIfIsShardedCluster
(
'Cursor needs to be advanced multiple times and can\'t be rewound afterwards.'
);
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$this
->
defaultOptions
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
...
...
@@ -473,8 +471,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -493,8 +490,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
2
,
'x'
=>
'bar'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -509,6 +505,8 @@ class WatchFunctionalTest extends FunctionalTestCase
public
function
testResumeMultipleTimesInSuccession
()
{
$this
->
skipIfIsShardedCluster
(
'getMore may return empty response before periodicNoopIntervalSecs on sharded clusters.'
);
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$this
->
defaultOptions
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
...
...
@@ -642,8 +640,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertSame
(
0
,
$changeStream
->
key
());
$changeStream
->
next
();
...
...
@@ -662,8 +659,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
2
,
'x'
=>
'bar'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertSame
(
1
,
$changeStream
->
key
());
}
...
...
@@ -679,8 +675,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream
->
rewind
();
$this
->
assertFalse
(
$changeStream
->
valid
());
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -750,9 +745,9 @@ class WatchFunctionalTest extends FunctionalTestCase
public
function
provideNonResumableErrorCodes
()
{
return
[
[
136
],
// CappedPositionLost
[
237
],
// CursorKilled
[
11601
],
// Interrupted
'CappedPositionLost'
=>
[
136
],
'CursorKilled'
=>
[
237
],
'Interrupted'
=>
[
11601
],
];
}
...
...
@@ -781,7 +776,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
expectException
(
ResumeTokenException
::
class
);
$this
->
expectExceptionMessage
(
'Resume token not found in change document'
);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
}
/**
...
...
@@ -804,7 +799,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$this
->
expectException
(
ServerException
::
class
);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
}
/**
...
...
@@ -832,7 +827,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
expectException
(
ResumeTokenException
::
class
);
$this
->
expectExceptionMessage
(
'Expected resume token to have type "array or object" but found "string"'
);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
}
/**
...
...
@@ -855,7 +850,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$this
->
expectException
(
ServerException
::
class
);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
}
public
function
testMaxAwaitTimeMS
()
...
...
@@ -901,12 +896,24 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
]);
/* Advancing the change stream again will issue a getMore, but the
* server should not block since a document has been inserted. */
* server should not block since a document has been inserted.
* For sharded clusters, we have to repeat the getMore iteration until
* the cursor is valid since the first getMore commands after an insert
* may not return any data. Only the time of the last getMore command is
* taken. */
$attempts
=
$this
->
isShardedCluster
()
?
5
:
1
;
for
(
$i
=
0
;
$i
<
$attempts
;
$i
++
)
{
$startTime
=
microtime
(
true
);
$changeStream
->
next
();
$duration
=
microtime
(
true
)
-
$startTime
;
$this
->
assertLessThan
(
$pivot
,
$duration
);
if
(
$changeStream
->
valid
())
{
break
;
}
}
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
assertLessThan
(
$pivot
,
$duration
);
}
public
function
testRewindExtractsResumeTokenAndNextResumes
()
...
...
@@ -925,17 +932,25 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream
->
rewind
();
$this
->
assertFalse
(
$changeStream
->
valid
());
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$resumeToken
=
$changeStream
->
current
()
->
_id
;
$options
=
[
'resumeAfter'
=>
$resumeToken
]
+
$this
->
defaultOptions
;
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$options
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
$this
->
assertSame
(
$resumeToken
,
$changeStream
->
getResumeToken
());
$this
->
assertSame
Document
(
$resumeToken
,
$changeStream
->
getResumeToken
());
$changeStream
->
rewind
();
if
(
$this
->
isShardedCluster
())
{
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this
->
advanceCursorUntilValid
(
$changeStream
);
}
else
{
$this
->
assertTrue
(
$changeStream
->
valid
());
}
$this
->
assertSame
(
0
,
$changeStream
->
key
());
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -948,8 +963,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
killChangeStreamCursor
(
$changeStream
);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertSame
(
1
,
$changeStream
->
key
());
$expectedResult
=
[
...
...
@@ -973,18 +987,25 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$this
->
insertDocument
([
'_id'
=>
2
,
'x'
=>
'bar'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$resumeToken
=
$changeStream
->
current
()
->
_id
;
$options
=
$this
->
defaultOptions
+
[
'resumeAfter'
=>
$resumeToken
];
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$options
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
$this
->
assertSame
(
$resumeToken
,
$changeStream
->
getResumeToken
());
$this
->
assertSame
Document
(
$resumeToken
,
$changeStream
->
getResumeToken
());
$changeStream
->
rewind
();
if
(
$this
->
isShardedCluster
())
{
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this
->
advanceCursorUntilValid
(
$changeStream
);
}
else
{
$this
->
assertTrue
(
$changeStream
->
valid
());
}
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -1012,18 +1033,25 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$this
->
insertDocument
([
'_id'
=>
2
,
'x'
=>
'bar'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$resumeToken
=
$changeStream
->
current
()
->
_id
;
$options
=
$this
->
defaultOptions
+
[
'startAfter'
=>
$resumeToken
];
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$options
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
$this
->
assertSame
(
$resumeToken
,
$changeStream
->
getResumeToken
());
$this
->
assertSame
Document
(
$resumeToken
,
$changeStream
->
getResumeToken
());
$changeStream
->
rewind
();
if
(
$this
->
isShardedCluster
())
{
/* aggregate on a sharded cluster may not return any data in the
* initial batch until periodicNoopIntervalSecs has passed. Thus,
* advance the change stream until we've received data. */
$this
->
advanceCursorUntilValid
(
$changeStream
);
}
else
{
$this
->
assertTrue
(
$changeStream
->
valid
());
}
$expectedResult
=
[
'_id'
=>
$changeStream
->
current
()
->
_id
,
...
...
@@ -1049,8 +1077,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'_id'
=>
1
,
'x'
=>
'foo'
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertMatchesDocument
(
$expectedChangeDocument
,
$changeStream
->
current
());
}
...
...
@@ -1099,7 +1126,7 @@ class WatchFunctionalTest extends FunctionalTestCase
/* Note: we intentionally do not start iteration with rewind() to ensure
* that next() behaves identically when called without rewind(). */
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertSame
(
0
,
$changeStream
->
key
());
...
...
@@ -1124,7 +1151,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
assertNull
(
$changeStream
->
key
());
try
{
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
fail
(
'Exception for missing resume token was not thrown'
);
}
catch
(
ResumeTokenException
$e
)
{
/* On server versions < 4.1.8, a client-side error is thrown. */
...
...
@@ -1219,7 +1246,7 @@ class WatchFunctionalTest extends FunctionalTestCase
// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
$this
->
dropCollection
();
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertNull
(
$rp
->
getValue
(
$changeStream
));
}
...
...
@@ -1341,6 +1368,10 @@ class WatchFunctionalTest extends FunctionalTestCase
*/
public
function
testOriginalReadPreferenceIsPreservedOnResume
()
{
if
(
$this
->
isShardedCluster
())
{
$this
->
markTestSkipped
(
'Test does not apply to sharded clusters'
);
}
$readPreference
=
new
ReadPreference
(
'secondary'
);
$options
=
[
'readPreference'
=>
$readPreference
]
+
$this
->
defaultOptions
;
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$options
);
...
...
@@ -1421,6 +1452,8 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
markTestSkipped
(
'Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1'
);
}
$this
->
skipIfIsShardedCluster
(
'Resume token behaviour can\'t be reliably tested on sharded clusters.'
);
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$this
->
defaultOptions
);
$lastOpTime
=
null
;
...
...
@@ -1438,7 +1471,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertTrue
(
$changeStream
->
valid
());
$resumeToken
=
$changeStream
->
getResumeToken
();
...
...
@@ -1483,7 +1516,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertTrue
(
$changeStream
->
valid
());
$resumeToken
=
$changeStream
->
getResumeToken
();
...
...
@@ -1530,8 +1563,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
insertDocument
([
'x'
=>
1
]);
$changeStream
->
next
();
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
advanceCursorUntilValid
(
$changeStream
);
$resumeToken
=
$changeStream
->
getResumeToken
();
$options
=
[
'startAfter'
=>
$resumeToken
]
+
$this
->
defaultOptions
;
...
...
@@ -1540,7 +1572,7 @@ class WatchFunctionalTest extends FunctionalTestCase
$changeStream
->
rewind
();
$this
->
insertDocument
([
'x'
=>
2
]);
$
changeStream
->
next
(
);
$
this
->
advanceCursorUntilValid
(
$changeStream
);
$this
->
assertTrue
(
$changeStream
->
valid
());
$this
->
killChangeStreamCursor
(
$changeStream
);
...
...
@@ -1621,4 +1653,32 @@ class WatchFunctionalTest extends FunctionalTestCase
$operation
=
new
DatabaseCommand
(
$this
->
getDatabaseName
(),
$command
);
$operation
->
execute
(
$this
->
getPrimaryServer
());
}
private
function
advanceCursorUntilValid
(
Iterator
$iterator
,
$limitOnShardedClusters
=
5
)
{
if
(
!
$this
->
isShardedCluster
())
{
$iterator
->
next
();
$this
->
assertTrue
(
$iterator
->
valid
());
return
;
}
for
(
$i
=
0
;
$i
<
$limitOnShardedClusters
;
$i
++
)
{
$iterator
->
next
();
if
(
$iterator
->
valid
())
{
return
;
}
}
throw
new
ExpectationFailedException
(
sprintf
(
'Expected cursor to return an element but none was found after %d attempts.'
,
$limitOnShardedClusters
));
}
private
function
skipIfIsShardedCluster
(
$message
)
{
if
(
!
$this
->
isShardedCluster
())
{
return
;
}
$this
->
markTestSkipped
(
sprintf
(
'Test does not apply on sharded clusters: %s'
,
$message
));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment