Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
M
mongo-php-library
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sinan
mongo-php-library
Commits
5fd00fbe
Commit
5fd00fbe
authored
Apr 16, 2018
by
Katherine Walker
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
PHPLIB-342: Change streams should use the same session when resuming
parent
0331b37b
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
105 additions
and
0 deletions
+105
-0
ChangeStream.php
src/ChangeStream.php
+13
-0
Watch.php
src/Operation/Watch.php
+6
-0
WatchFunctionalTest.php
tests/Operation/WatchFunctionalTest.php
+86
-0
No files found.
src/ChangeStream.php
View file @
5fd00fbe
...
@@ -101,6 +101,15 @@ class ChangeStream implements Iterator
...
@@ -101,6 +101,15 @@ class ChangeStream implements Iterator
$this
->
hasAdvanced
=
true
;
$this
->
hasAdvanced
=
true
;
$this
->
resumeToken
=
$this
->
extractResumeToken
(
$this
->
csIt
->
current
());
$this
->
resumeToken
=
$this
->
extractResumeToken
(
$this
->
csIt
->
current
());
}
}
/* If the cursorId is 0, the server has invalidated the cursor so we
* will never perform another getMore. This means that we cannot
* resume and we can therefore unset the resumeCallable, which will
* free any reference to Watch. This will also free the only
* reference to an implicit session, since any such reference
* belongs to Watch. */
if
((
string
)
$this
->
getCursorId
()
===
'0'
)
{
$this
->
resumeCallable
=
null
;
}
}
catch
(
RuntimeException
$e
)
{
}
catch
(
RuntimeException
$e
)
{
if
(
strpos
(
$e
->
getMessage
(),
"not master"
)
!==
false
)
{
if
(
strpos
(
$e
->
getMessage
(),
"not master"
)
!==
false
)
{
$resumable
=
true
;
$resumable
=
true
;
...
@@ -130,6 +139,10 @@ class ChangeStream implements Iterator
...
@@ -130,6 +139,10 @@ class ChangeStream implements Iterator
$this
->
hasAdvanced
=
true
;
$this
->
hasAdvanced
=
true
;
$this
->
resumeToken
=
$this
->
extractResumeToken
(
$this
->
csIt
->
current
());
$this
->
resumeToken
=
$this
->
extractResumeToken
(
$this
->
csIt
->
current
());
}
}
// As with next(), free the callable once we know it will never be used.
if
((
string
)
$this
->
getCursorId
()
===
'0'
)
{
$this
->
resumeCallable
=
null
;
}
}
catch
(
RuntimeException
$e
)
{
}
catch
(
RuntimeException
$e
)
{
if
(
strpos
(
$e
->
getMessage
(),
"not master"
)
!==
false
)
{
if
(
strpos
(
$e
->
getMessage
(),
"not master"
)
!==
false
)
{
$resumable
=
true
;
$resumable
=
true
;
...
...
src/Operation/Watch.php
View file @
5fd00fbe
...
@@ -110,6 +110,12 @@ class Watch implements Executable
...
@@ -110,6 +110,12 @@ class Watch implements Executable
}
}
}
}
if
(
!
isset
(
$options
[
'session'
]))
{
try
{
$options
[
'session'
]
=
$manager
->
startSession
();
}
catch
(
DriverRuntimeException
$e
)
{}
}
$this
->
databaseName
=
(
string
)
$databaseName
;
$this
->
databaseName
=
(
string
)
$databaseName
;
$this
->
collectionName
=
(
string
)
$collectionName
;
$this
->
collectionName
=
(
string
)
$collectionName
;
$this
->
pipeline
=
$pipeline
;
$this
->
pipeline
=
$pipeline
;
...
...
tests/Operation/WatchFunctionalTest.php
View file @
5fd00fbe
...
@@ -8,7 +8,9 @@ use MongoDB\Driver\ReadPreference;
...
@@ -8,7 +8,9 @@ use MongoDB\Driver\ReadPreference;
use
MongoDB\Driver\Server
;
use
MongoDB\Driver\Server
;
use
MongoDB\Driver\Exception\ConnectionTimeoutException
;
use
MongoDB\Driver\Exception\ConnectionTimeoutException
;
use
MongoDB\Exception\ResumeTokenException
;
use
MongoDB\Exception\ResumeTokenException
;
use
MongoDB\Operation\CreateCollection
;
use
MongoDB\Operation\DatabaseCommand
;
use
MongoDB\Operation\DatabaseCommand
;
use
MongoDB\Operation\DropCollection
;
use
MongoDB\Operation\InsertOne
;
use
MongoDB\Operation\InsertOne
;
use
MongoDB\Operation\Watch
;
use
MongoDB\Operation\Watch
;
use
MongoDB\Tests\CommandObserver
;
use
MongoDB\Tests\CommandObserver
;
...
@@ -588,6 +590,90 @@ class WatchFunctionalTest extends FunctionalTestCase
...
@@ -588,6 +590,90 @@ class WatchFunctionalTest extends FunctionalTestCase
$this
->
assertSame
(
2
,
$changeStream
->
key
());
$this
->
assertSame
(
2
,
$changeStream
->
key
());
}
}
public
function
testSessionPersistsAfterResume
()
{
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$this
->
defaultOptions
);
$changeStream
=
null
;
$originalSession
=
null
;
$sessionAfterResume
=
[];
$commands
=
[];
/* We want to ensure that the lsid of the initial aggregate matches the
* lsid of any aggregates after the change stream resumes. After
* PHPC-1152 is complete, we will ensure that the lsid of the initial
* aggregate matches the lsid of any subsequent aggregates and getMores.
*/
(
new
CommandObserver
)
->
observe
(
function
()
use
(
$operation
,
&
$changeStream
)
{
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
},
function
(
$changeStream
)
use
(
&
$originalSession
)
{
if
(
isset
(
$changeStream
->
aggregate
))
{
$originalSession
=
bin2hex
((
string
)
$changeStream
->
lsid
->
id
);
}
}
);
$changeStream
->
rewind
();
$this
->
killChangeStreamCursor
(
$changeStream
);
(
new
CommandObserver
)
->
observe
(
function
()
use
(
&
$changeStream
)
{
$changeStream
->
next
();
},
function
(
$changeStream
)
use
(
&
$sessionAfterResume
,
&
$commands
)
{
$commands
[]
=
key
((
array
)
$changeStream
);
$sessionAfterResume
[]
=
bin2hex
((
string
)
$changeStream
->
lsid
->
id
);
}
);
$expectedCommands
=
[
/* We expect a getMore to be issued because we are calling next(). */
'getMore'
,
/* Since we have killed the cursor, ChangeStream will resume by
* issuing a new aggregate commmand. */
'aggregate'
,
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors'
,
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore'
,
];
$this
->
assertSame
(
$expectedCommands
,
$commands
);
foreach
(
$sessionAfterResume
as
$session
)
{
$this
->
assertEquals
(
$session
,
$originalSession
);
}
}
public
function
testSessionFreed
()
{
$operation
=
new
CreateCollection
(
$this
->
getDatabaseName
(),
$this
->
getCollectionName
());
$operation
->
execute
(
$this
->
getPrimaryServer
());
$operation
=
new
Watch
(
$this
->
manager
,
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
[],
$this
->
defaultOptions
);
$changeStream
=
$operation
->
execute
(
$this
->
getPrimaryServer
());
$rc
=
new
ReflectionClass
(
$changeStream
);
$rp
=
$rc
->
getProperty
(
'resumeCallable'
);
$rp
->
setAccessible
(
true
);
$this
->
assertNotNull
(
$rp
->
getValue
(
$changeStream
));
// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
$operation
=
new
DropCollection
(
$this
->
getDatabaseName
(),
$this
->
getCollectionName
());
$operation
->
execute
(
$this
->
getPrimaryServer
());
$changeStream
->
next
();
$this
->
assertNull
(
$rp
->
getValue
(
$changeStream
));
}
private
function
insertDocument
(
$document
)
private
function
insertDocument
(
$document
)
{
{
$insertOne
=
new
InsertOne
(
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
$document
);
$insertOne
=
new
InsertOne
(
$this
->
getDatabaseName
(),
$this
->
getCollectionName
(),
$document
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment