Commit c94cb808 authored by Jeremy Mikola's avatar Jeremy Mikola

PHPLIB-109: Extract BulkWrite operation class

parent 62397f02
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
namespace MongoDB; namespace MongoDB;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Command; use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor; use MongoDB\Driver\Cursor;
use MongoDB\Driver\Manager; use MongoDB\Driver\Manager;
...@@ -14,6 +13,7 @@ use MongoDB\Exception\UnexpectedTypeException; ...@@ -14,6 +13,7 @@ use MongoDB\Exception\UnexpectedTypeException;
use MongoDB\Model\IndexInfoIterator; use MongoDB\Model\IndexInfoIterator;
use MongoDB\Model\IndexInput; use MongoDB\Model\IndexInput;
use MongoDB\Operation\Aggregate; use MongoDB\Operation\Aggregate;
use MongoDB\Operation\BulkWrite;
use MongoDB\Operation\CreateIndexes; use MongoDB\Operation\CreateIndexes;
use MongoDB\Operation\Count; use MongoDB\Operation\Count;
use MongoDB\Operation\DeleteMany; use MongoDB\Operation\DeleteMany;
...@@ -101,135 +101,23 @@ class Collection ...@@ -101,135 +101,23 @@ class Collection
} }
/** /**
* Adds a full set of write operations into a bulk and executes it * Executes multiple write operations.
* *
* The syntax of the $bulk array is: * @see BulkWrite::__construct() for supported options
* $bulk = [ * @param array[] $operations List of write operations
* [ * @param array $options Command options
* 'METHOD' => [ * @return BulkWriteResult
* $document,
* $extraArgument1,
* $extraArgument2,
* ],
* ],
* [
* 'METHOD' => [
* $document,
* $extraArgument1,
* $extraArgument2,
* ],
* ],
* ]
*
*
* Where METHOD is one of
* - 'insertOne'
* Supports no $extraArgument
* - 'updateMany'
* Requires $extraArgument1, same as $update for Collection::updateMany()
* Optional $extraArgument2, same as $options for Collection::updateMany()
* - 'updateOne'
* Requires $extraArgument1, same as $update for Collection::updateOne()
* Optional $extraArgument2, same as $options for Collection::updateOne()
* - 'replaceOne'
* Requires $extraArgument1, same as $update for Collection::replaceOne()
* Optional $extraArgument2, same as $options for Collection::replaceOne()
* - 'deleteOne'
* Supports no $extraArgument
* - 'deleteMany'
* Supports no $extraArgument
*
* @example Collection-bulkWrite.php Using Collection::bulkWrite()
*
* @see Collection::getBulkOptions() for supported $options
*
* @param array $ops Array of operations
* @param array $options Additional options
* @return WriteResult
*/ */
public function bulkWrite(array $ops, array $options = array()) public function bulkWrite(array $operations, array $options = array())
{ {
$options = array_merge($this->getBulkOptions(), $options); if ( ! isset($options['writeConcern']) && isset($this->wc)) {
$options['writeConcern'] = $this->wc;
$bulk = new BulkWrite($options["ordered"]);
$insertedIds = array();
foreach ($ops as $n => $op) {
foreach ($op as $opname => $args) {
if (!isset($args[0])) {
throw new InvalidArgumentException(sprintf("Missing argument#1 for '%s' (operation#%d)", $opname, $n));
}
switch ($opname) {
case "insertOne":
$insertedId = $bulk->insert($args[0]);
if ($insertedId !== null) {
$insertedIds[$n] = $insertedId;
} else {
$insertedIds[$n] = is_array($args[0]) ? $args[0]['_id'] : $args[0]->_id;
}
break;
case "updateMany":
if (!isset($args[1])) {
throw new InvalidArgumentException(sprintf("Missing argument#2 for '%s' (operation#%d)", $opname, $n));
}
$options = array_merge($this->getWriteOptions(), isset($args[2]) ? $args[2] : array(), array("multi" => true));
$firstKey = key($args[1]);
if (!isset($firstKey[0]) || $firstKey[0] != '$') {
throw new InvalidArgumentException("First key in \$update must be a \$operator");
}
$bulk->update($args[0], $args[1], $options);
break;
case "updateOne":
if (!isset($args[1])) {
throw new InvalidArgumentException(sprintf("Missing argument#2 for '%s' (operation#%d)", $opname, $n));
}
$options = array_merge($this->getWriteOptions(), isset($args[2]) ? $args[2] : array(), array("multi" => false));
$firstKey = key($args[1]);
if (!isset($firstKey[0]) || $firstKey[0] != '$') {
throw new InvalidArgumentException("First key in \$update must be a \$operator");
}
$bulk->update($args[0], $args[1], $options);
break;
case "replaceOne":
if (!isset($args[1])) {
throw new InvalidArgumentException(sprintf("Missing argument#2 for '%s' (operation#%d)", $opname, $n));
}
$options = array_merge($this->getWriteOptions(), isset($args[2]) ? $args[2] : array(), array("multi" => false));
$firstKey = key($args[1]);
if (isset($firstKey[0]) && $firstKey[0] == '$') {
throw new InvalidArgumentException("First key in \$update must NOT be a \$operator");
}
$bulk->update($args[0], $args[1], $options);
break;
case "deleteOne":
$options = array_merge($this->getWriteOptions(), isset($args[1]) ? $args[1] : array(), array("limit" => 1));
$bulk->delete($args[0], $options);
break;
case "deleteMany":
$options = array_merge($this->getWriteOptions(), isset($args[1]) ? $args[1] : array(), array("limit" => 0));
$bulk->delete($args[0], $options);
break;
default:
throw new InvalidArgumentException(sprintf("Unknown operation type called '%s' (operation#%d)", $opname, $n));
}
}
} }
$writeResult = $this->manager->executeBulkWrite($this->ns, $bulk, $this->wc); $operation = new BulkWrite($this->dbname, $this->collname, $operations, $options);
$server = $this->manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
return new BulkWriteResult($writeResult, $insertedIds); return $operation->execute($server);
} }
/** /**
...@@ -498,18 +386,6 @@ class Collection ...@@ -498,18 +386,6 @@ class Collection
return $operation->execute($server); return $operation->execute($server);
} }
/**
* Retrieves all Bulk Write options with their default values.
*
* @return array of available Bulk Write options
*/
public function getBulkOptions()
{
return array(
"ordered" => false,
);
}
/** /**
* Return the collection name. * Return the collection name.
* *
...@@ -541,20 +417,6 @@ class Collection ...@@ -541,20 +417,6 @@ class Collection
return $this->ns; return $this->ns;
} }
/**
* Retrieves all Write options with their default values.
*
* @return array of available Write options
*/
public function getWriteOptions()
{
return array(
"ordered" => false,
"upsert" => false,
"limit" => 1,
);
}
/** /**
* Inserts multiple documents. * Inserts multiple documents.
* *
......
<?php
namespace MongoDB\Operation;
use MongoDB\BulkWriteResult;
use MongoDB\Driver\BulkWrite as Bulk;
use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
/**
* Operation for executing multiple write operations.
*
* @api
* @see MongoDB\Collection::bulkWrite()
*/
class BulkWrite implements Executable
{
const DELETE_MANY = 'deleteMany';
const DELETE_ONE = 'deleteOne';
const INSERT_ONE = 'insertOne';
const REPLACE_ONE = 'replaceOne';
const UPDATE_MANY = 'updateMany';
const UPDATE_ONE = 'updateOne';
private $databaseName;
private $collectionName;
private $operations;
private $options;
/**
* Constructs a bulk write operation.
*
* Example array structure for all supported operation types:
*
* [
* [ 'deleteOne' => [ $filter ] ],
* [ 'deleteMany' => [ $filter ] ],
* [ 'insertOne' => [ $document ] ],
* [ 'replaceOne' => [ $filter, $replacement, $options ] ],
* [ 'updateMany' => [ $filter, $update, $options ] ],
* [ 'updateOne' => [ $filter, $update, $options ] ],
* ]
*
* Arguments correspond to the respective Operation classes; however, the
* writeConcern option is specified for the top-level bulk write operation
* instead of each individual operations.
*
* Supported options for replaceOne, updateMany, and updateOne operations:
*
* * upsert (boolean): When true, a new document is created if no document
* matches the query. The default is false.
*
* Supported options for the bulk write operation:
*
* * ordered (boolean): If true, when an insert fails, return without
* performing the remaining writes. If false, when a write fails,
* continue with the remaining writes, if any. The default is true.
*
* * writeConcern (MongoDB\Driver\WriteConcern): Write concern.
*
* @param string $databaseName Database name
* @param string $collectionName Collection name
* @param array[] $operations List of write operations
* @param array $options Command options
* @throws InvalidArgumentException
*/
public function __construct($databaseName, $collectionName, array $operations, array $options = array())
{
if (empty($operations)) {
throw new InvalidArgumentException('$operations is empty');
}
$expectedIndex = 0;
foreach ($operations as $i => $operation) {
if ($i !== $expectedIndex) {
throw new InvalidArgumentException(sprintf('$operations is not a list (unexpected index: "%s")', $i));
}
if ( ! is_array($operation)) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]', $i), $operation, 'array');
}
if (count($operation) !== 1) {
throw new InvalidArgumentException(sprintf('Expected one element in $operation[%d], actually: %d', $i, count($operation)));
}
$type = key($operation);
$args = current($operation);
if ( ! isset($args[0]) && ! array_key_exists(0, $args)) {
throw new InvalidArgumentException(sprintf('Missing first argument for $operations[%d]["%s"]', $i, $type));
}
if ( ! is_array($args[0]) && ! is_object($args[0])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][0]', $i, $type), $args[0], 'array or object');
}
switch ($type) {
case self::INSERT_ONE:
break;
case self::DELETE_MANY:
case self::DELETE_ONE:
$operations[$i][$type][1] = array('limit' => ($type === self::DELETE_ONE ? 1 : 0));
break;
case self::REPLACE_ONE:
if ( ! isset($args[1]) && ! array_key_exists(1, $args)) {
throw new InvalidArgumentException(sprintf('Missing second argument for $operations[%d]["%s"]', $i, $type));
}
if ( ! is_array($args[1]) && ! is_object($args[1])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][1]', $i, $type), $args[1], 'array or object');
}
if (\MongoDB\is_first_key_operator($args[1])) {
throw new InvalidArgumentException(sprintf('First key in $operations[%d]["%s"][1] is an update operator', $i, $type));
}
if ( ! isset($args[2])) {
$args[2] = array();
}
if ( ! is_array($args[2])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][2]', $i, $type), $args[2], 'array');
}
$args[2]['multi'] = false;
$args[2] += array('upsert' => false);
if ( ! is_bool($args[2]['upsert'])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][2]["upsert"]', $i, $type), $args[2]['upsert'], 'boolean');
}
$operations[$i][$type][2] = $args[2];
break;
case self::UPDATE_MANY:
case self::UPDATE_ONE:
if ( ! isset($args[1]) && ! array_key_exists(1, $args)) {
throw new InvalidArgumentException(sprintf('Missing second argument for $operations[%d]["%s"]', $i, $type));
}
if ( ! is_array($args[1]) && ! is_object($args[1])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][1]', $i, $type), $args[1], 'array or object');
}
if ( ! \MongoDB\is_first_key_operator($args[1])) {
throw new InvalidArgumentException(sprintf('First key in $operations[%d]["%s"][1] is not an update operator', $i, $type));
}
if ( ! isset($args[2])) {
$args[2] = array();
}
if ( ! is_array($args[2])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][2]', $i, $type), $args[2], 'array');
}
$args[2]['multi'] = ($type === self::UPDATE_MANY);
$args[2] += array('upsert' => false);
if ( ! is_bool($args[2]['upsert'])) {
throw new InvalidArgumentTypeException(sprintf('$operations[%d]["%s"][2]["upsert"]', $i, $type), $args[2]['upsert'], 'boolean');
}
$operations[$i][$type][2] = $args[2];
break;
default:
throw new InvalidArgumentException(sprintf('Unknown operation type "%s" in $operations[%d]', $type, $i));
}
$expectedIndex += 1;
}
$options += array(
'ordered' => true,
);
if ( ! is_bool($options['ordered'])) {
throw new InvalidArgumentTypeException('"ordered" option', $options['ordered'], 'boolean');
}
if (array_key_exists('writeConcern', $options) && ! $options['writeConcern'] instanceof WriteConcern) {
throw new InvalidArgumentTypeException('"writeConcern" option', $options['writeConcern'], 'MongoDB\Driver\WriteConcern');
}
$this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName;
$this->operations = $operations;
$this->options = $options;
}
/**
* Execute the operation.
*
* @see Executable::execute()
* @param Server $server
* @return BulkWriteResult
*/
public function execute(Server $server)
{
$bulk = new Bulk($this->options['ordered']);
$insertedIds = array();
foreach ($this->operations as $i => $operation) {
$type = key($operation);
$args = current($operation);
switch ($type) {
case self::DELETE_MANY:
case self::DELETE_ONE:
$bulk->delete($args[0], $args[1]);
break;
case self::INSERT_ONE:
$insertedId = $bulk->insert($args[0]);
if ($insertedId !== null) {
$insertedIds[$i] = $insertedId;
} else {
// TODO: This may be removed if PHPC-382 is implemented
$insertedIds[$i] = is_array($args[0]) ? $args[0]['_id'] : $args[0]->_id;
}
break;
case self::REPLACE_ONE:
case self::UPDATE_MANY:
case self::UPDATE_ONE:
$bulk->update($args[0], $args[1], $args[2]);
}
}
$writeConcern = isset($this->options['writeConcern']) ? $this->options['writeConcern'] : null;
$writeResult = $server->executeBulkWrite($this->databaseName . '.' . $this->collectionName, $bulk, $writeConcern);
return new BulkWriteResult($writeResult, $insertedIds);
}
}
...@@ -4,7 +4,6 @@ namespace MongoDB\Operation; ...@@ -4,7 +4,6 @@ namespace MongoDB\Operation;
use MongoDB\Driver\Command; use MongoDB\Driver\Command;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\BulkWrite;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\RuntimeException; use MongoDB\Exception\RuntimeException;
use MongoDB\Exception\UnexpectedTypeException; use MongoDB\Exception\UnexpectedTypeException;
......
...@@ -4,7 +4,7 @@ namespace MongoDB\Operation; ...@@ -4,7 +4,7 @@ namespace MongoDB\Operation;
use MongoDB\Driver\Command; use MongoDB\Driver\Command;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\BulkWrite; use MongoDB\Driver\BulkWrite as Bulk;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\RuntimeException; use MongoDB\Exception\RuntimeException;
use MongoDB\Exception\UnexpectedTypeException; use MongoDB\Exception\UnexpectedTypeException;
...@@ -108,7 +108,7 @@ class CreateIndexes implements Executable ...@@ -108,7 +108,7 @@ class CreateIndexes implements Executable
*/ */
private function executeLegacy(Server $server) private function executeLegacy(Server $server)
{ {
$bulk = new BulkWrite(true); $bulk = new Bulk(true);
foreach ($this->indexes as $index) { foreach ($this->indexes as $index) {
$bulk->insert($index); $bulk->insert($index);
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace MongoDB\Operation; namespace MongoDB\Operation;
use MongoDB\DeleteResult; use MongoDB\DeleteResult;
use MongoDB\Driver\BulkWrite; use MongoDB\Driver\BulkWrite as Bulk;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
...@@ -72,7 +72,7 @@ class Delete implements Executable ...@@ -72,7 +72,7 @@ class Delete implements Executable
*/ */
public function execute(Server $server) public function execute(Server $server)
{ {
$bulk = new BulkWrite(); $bulk = new Bulk();
$bulk->delete($this->filter, array('limit' => $this->limit)); $bulk->delete($this->filter, array('limit' => $this->limit));
$writeConcern = isset($this->options['writeConcern']) ? $this->options['writeConcern'] : null; $writeConcern = isset($this->options['writeConcern']) ? $this->options['writeConcern'] : null;
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace MongoDB\Operation; namespace MongoDB\Operation;
use MongoDB\InsertManyResult; use MongoDB\InsertManyResult;
use MongoDB\Driver\BulkWrite; use MongoDB\Driver\BulkWrite as Bulk;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
...@@ -87,7 +87,7 @@ class InsertMany implements Executable ...@@ -87,7 +87,7 @@ class InsertMany implements Executable
*/ */
public function execute(Server $server) public function execute(Server $server)
{ {
$bulk = new BulkWrite($this->options['ordered']); $bulk = new Bulk($this->options['ordered']);
$insertedIds = array(); $insertedIds = array();
foreach ($this->documents as $i => $document) { foreach ($this->documents as $i => $document) {
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace MongoDB\Operation; namespace MongoDB\Operation;
use MongoDB\InsertOneResult; use MongoDB\InsertOneResult;
use MongoDB\Driver\BulkWrite; use MongoDB\Driver\BulkWrite as Bulk;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentTypeException; use MongoDB\Exception\InvalidArgumentTypeException;
...@@ -60,7 +60,7 @@ class InsertOne implements Executable ...@@ -60,7 +60,7 @@ class InsertOne implements Executable
*/ */
public function execute(Server $server) public function execute(Server $server)
{ {
$bulk = new BulkWrite(); $bulk = new Bulk();
$insertedId = $bulk->insert($this->document); $insertedId = $bulk->insert($this->document);
if ($insertedId === null) { if ($insertedId === null) {
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace MongoDB\Operation; namespace MongoDB\Operation;
use MongoDB\UpdateResult; use MongoDB\UpdateResult;
use MongoDB\Driver\BulkWrite; use MongoDB\Driver\BulkWrite as Bulk;
use MongoDB\Driver\Server; use MongoDB\Driver\Server;
use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\InvalidArgumentException;
...@@ -100,7 +100,7 @@ class Update implements Executable ...@@ -100,7 +100,7 @@ class Update implements Executable
'upsert' => $this->options['upsert'], 'upsert' => $this->options['upsert'],
); );
$bulk = new BulkWrite(); $bulk = new Bulk();
$bulk->update($this->filter, $this->update, $options); $bulk->update($this->filter, $this->update, $options);
$writeConcern = isset($this->options['writeConcern']) ? $this->options['writeConcern'] : null; $writeConcern = isset($this->options['writeConcern']) ? $this->options['writeConcern'] : null;
......
...@@ -128,7 +128,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase ...@@ -128,7 +128,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\InvalidArgumentException * @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessage Unknown operation type called 'foo' (operation#0) * @expectedExceptionMessage Unknown operation type "foo" in $operations[0]
*/ */
public function testUnknownOperation() public function testUnknownOperation()
{ {
...@@ -139,7 +139,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase ...@@ -139,7 +139,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\InvalidArgumentException * @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessageRegExp /Missing argument#\d+ for '\w+' \(operation#\d+\)/ * @expectedExceptionMessageRegExp /Missing (first|second) argument for \$operations\[\d+\]\["\w+\"]/
* @dataProvider provideOpsWithMissingArguments * @dataProvider provideOpsWithMissingArguments
*/ */
public function testMissingArguments(array $ops) public function testMissingArguments(array $ops)
...@@ -164,7 +164,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase ...@@ -164,7 +164,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\InvalidArgumentException * @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessage First key in $update must be a $operator * @expectedExceptionMessage First key in $operations[0]["updateOne"][1] is not an update operator
*/ */
public function testUpdateOneRequiresUpdateOperators() public function testUpdateOneRequiresUpdateOperators()
{ {
...@@ -175,7 +175,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase ...@@ -175,7 +175,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\InvalidArgumentException * @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessage First key in $update must be a $operator * @expectedExceptionMessage First key in $operations[0]["updateMany"][1] is not an update operator
*/ */
public function testUpdateManyRequiresUpdateOperators() public function testUpdateManyRequiresUpdateOperators()
{ {
...@@ -186,7 +186,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase ...@@ -186,7 +186,7 @@ class BulkWriteFunctionalTest extends FunctionalTestCase
/** /**
* @expectedException MongoDB\Exception\InvalidArgumentException * @expectedException MongoDB\Exception\InvalidArgumentException
* @expectedExceptionMessage First key in $update must NOT be a $operator * @expectedExceptionMessage First key in $operations[0]["replaceOne"][1] is an update operator
*/ */
public function testReplaceOneRequiresReplacementDocument() public function testReplaceOneRequiresReplacementDocument()
{ {
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment