Commit cd355cb0 authored by Jens Segers's avatar Jens Segers Committed by GitHub

Merge pull request #786 from Bodom78/master

Fixes #743 for version 3.x
parents b1953fd3 99fad179
......@@ -2,31 +2,75 @@
use Carbon\Carbon;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Jobs\DatabaseJob;
use MongoDB\Operation\FindOneAndUpdate;
use DB;
class MongoQueue extends DatabaseQueue
{
/**
* Get the next available job for the queue.
* Pop the next job off of the queue.
*
* @param string $queue
*
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
if (!is_null($this->expire)) {
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
}
if ($job = $this->getNextAvailableJobAndReserve($queue)) {
return new DatabaseJob(
$this->container, $this, $job, $queue
);
}
}
/**
* Get the next available job for the queue and mark it as reserved.
*
* When using multiple daemon queue listeners to process jobs there
* is a possibility that multiple processes can end up reading the
* same record before one has flagged it as reserved.
*
* This race condition can result in random jobs being run more then
* once. To solve this we use findOneAndUpdate to lock the next jobs
* record while flagging it as reserved at the same time.
*
* @param string|null $queue
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
protected function getNextAvailableJobAndReserve($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('available_at', '<=', $this->getTime())
->orderBy('id', 'asc')
->first();
$job = DB::getCollection($this->table)->findOneAndUpdate(
[
'queue' => $this->getQueue($queue),
'reserved' => 0,
'available_at' => ['$lte' => $this->getTime()],
],
[
'$set' => [
'reserved' => 1,
'reserved_at' => $this->getTime(),
],
],
[
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
'sort' => ['available_at' => 1],
]
);
if ($job) {
$job = (object) $job;
$job->id = $job->_id;
}
return $job ?: null;
return $job;
}
/**
......@@ -40,16 +84,16 @@ class MongoQueue extends DatabaseQueue
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$reserved = $this->database->collection($this->table)
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)->get();
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)->get();
foreach ($reserved as $job) {
$attempts = $job['attempts'] + 1;
$this->releaseJob($job['_id'], $attempts);
}
}
/**
* Release the given job ID from reservation.
*
......@@ -66,19 +110,6 @@ class MongoQueue extends DatabaseQueue
]);
}
/**
* Mark the given job ID as reserved.
*
* @param string $id
* @return void
*/
protected function markJobAsReserved($id)
{
$this->database->collection($this->table)->where('_id', $id)->update([
'reserved' => 1, 'reserved_at' => $this->getTime(),
]);
}
/**
* Delete a reserved job from the queue.
*
......
......@@ -2,12 +2,46 @@
class QueueTest extends TestCase
{
public function testQueue()
public function setUp()
{
$id = Queue::push('test', ['foo' => 'bar'], 'test');
parent::setUp();
// Always start with a clean slate
Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->truncate();
Queue::getDatabase()->table(Config::get('queue.failed.table'))->truncate();
}
public function testQueueJobLifeCycle()
{
$id = Queue::push('test', ['action' => 'QueueJobLifeCycle'], 'test');
$this->assertNotNull($id);
// Get and reserve the test job (next available)
$job = Queue::pop('test');
$this->assertInstanceOf('Illuminate\Queue\Jobs\DatabaseJob', $job);
$this->assertEquals(1, $job->getDatabaseJob()->reserved);
$this->assertEquals(json_encode(['job' => 'test', 'data' => ['action' => 'QueueJobLifeCycle']]), $job->getRawBody());
// Remove reserved job
$job->delete();
$this->assertEquals(0, Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->count());
}
public function testQueueJobExpired()
{
$id = Queue::push('test', ['action' => 'QueueJobExpired'], 'test');
$this->assertNotNull($id);
// Expire the test job
$expiry = \Carbon\Carbon::now()->subSeconds(Config::get('queue.connections.database.expire'))->getTimestamp();
Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->where('_id', $id)->update(['reserved' => 1, 'reserved_at' => $expiry]);
// Expect an attempted older job in the queue
$job = Queue::pop('test');
$this->assertEquals(2, $job->getDatabaseJob()->attempts);
$this->assertGreaterThan($expiry, $job->getDatabaseJob()->reserved_at);
$job->delete();
$this->assertEquals(0, Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->count());
}
}
<?php
return [
'default' => 'database',
'connections' => [
'database' => [
'driver' => 'mongodb',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
],
],
'failed' => [
'database' => 'mongodb',
'table' => 'failed_jobs',
],
];
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment