Commit d425493b authored by Jens Segers's avatar Jens Segers

Fix queue support #623

parent b0354f28
<?php namespace Jenssegers\Mongodb;
use Illuminate\Support\ServiceProvider;
use Jenssegers\Eloquent\Model;
use Jenssegers\Mongodb\Queue\MongoConnector;
class MongodbServiceProvider extends ServiceProvider
{
......@@ -20,10 +20,18 @@ class MongodbServiceProvider extends ServiceProvider
*/
public function register()
{
// Add database driver.
$this->app->resolving('db', function ($db) {
$db->extend('mongodb', function ($config) {
return new Connection($config);
});
});
// Add connector for queue support.
$this->app->resolving('queue', function ($queue) {
$queue->addConnector('mongodb', function () {
return new MongoConnector($this->app['db']);
});
});
}
}
<?php namespace Jenssegers\Mongodb\Queue;
use Illuminate\Database\ConnectionResolverInterface;
use Illuminate\Queue\Connectors\ConnectorInterface;
use Illuminate\Support\Arr;
class MongoConnector implements ConnectorInterface
{
/**
* Database connections.
*
* @var \Illuminate\Database\ConnectionResolverInterface
*/
protected $connections;
/**
* Create a new connector instance.
*
* @param \Illuminate\Database\ConnectionResolverInterface $connections
* @return void
*/
public function __construct(ConnectionResolverInterface $connections)
{
$this->connections = $connections;
}
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new MongoQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$config['table'],
$config['queue'],
Arr::get($config, 'expire', 60)
);
}
}
<?php namespace Jenssegers\Mongodb\Queue;
use Carbon\Carbon;
use Illuminate\Queue\DatabaseQueue;
class MongoQueue extends DatabaseQueue
{
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('available_at', '<=', $this->getTime())
->orderBy('id', 'asc')
->first();
if ($job) {
$job = (object) $job;
$job->id = $job->_id;
}
return $job ?: null;
}
/**
* Release the jobs that have been reserved for too long.
*
* @param string $queue
* @return void
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$reserved = $this->database->collection($this->table)
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)->get();
foreach ($reserved as $job) {
$attempts = $job['attempts'] + 1;
$this->releaseJob($job['_id'], $attempts);
}
}
}
<?php
class QueueTest extends TestCase
{
public function testQueue()
{
$id = Queue::push('test', ['foo' => 'bar'], 'test');
$this->assertNotNull($id);
$job = Queue::pop('test');
$this->assertInstanceOf('Illuminate\Queue\Jobs\DatabaseJob', $job);
}
}
......@@ -29,6 +29,8 @@ class TestCase extends Orchestra\Testbench\TestCase
$config = require 'config/database.php';
$app['config']->set('app.key', 'ZsZewWyUJ5FsKp9lMwv4tYbNlegQilM7');
$app['config']->set('database.default', 'mongodb');
$app['config']->set('database.connections.mysql', $config['connections']['mysql']);
$app['config']->set('database.connections.mongodb', $config['connections']['mongodb']);
......@@ -36,5 +38,13 @@ class TestCase extends Orchestra\Testbench\TestCase
$app['config']->set('auth.model', 'User');
$app['config']->set('auth.providers.users.model', 'User');
$app['config']->set('cache.driver', 'array');
$app['config']->set('queue.default', 'mongodb');
$app['config']->set('queue.connections.mongodb', [
'driver' => 'mongodb',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
]);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment