MongoQueue.php 4.06 KB
Newer Older
Jens Segers's avatar
Jens Segers committed
1 2 3
<?php

namespace Jenssegers\Mongodb\Queue;
Jens Segers's avatar
Jens Segers committed
4 5 6

use Carbon\Carbon;
use Illuminate\Queue\DatabaseQueue;
7
use Jenssegers\Mongodb\Connection;
8
use MongoDB\Operation\FindOneAndUpdate;
Jens Segers's avatar
Jens Segers committed
9 10 11

class MongoQueue extends DatabaseQueue
{
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
    /**
     * The expiration time of a job.
     *
     * @var int|null
     */
    protected $retryAfter = 60;

    /**
     * The connection name for the queue.
     *
     * @var string
     */
    protected $connectionName;

    /**
     * @inheritdoc
     */
    public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60)
    {
        parent::__construct($database, $table, $default, $retryAfter);
        $this->retryAfter = $retryAfter;
    }

Jens Segers's avatar
Jens Segers committed
35
    /**
36
     * @inheritdoc
37 38 39 40 41
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

Jens Segers's avatar
Jens Segers committed
42
        if (!is_null($this->retryAfter)) {
43 44 45
            $this->releaseJobsThatHaveBeenReservedTooLong($queue);
        }

Fady Khalife's avatar
Fady Khalife committed
46
        if ($job = $this->getNextAvailableJobAndReserve($queue)) {
47 48
            return new MongoJob(
                $this->container, $this, $job, $this->connectionName, $queue
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
            );
        }
    }

    /**
     * Get the next available job for the queue and mark it as reserved.
     *
     * When using multiple daemon queue listeners to process jobs there
     * is a possibility that multiple processes can end up reading the
     * same record before one has flagged it as reserved.
     *
     * This race condition can result in random jobs being run more then
     * once. To solve this we use findOneAndUpdate to lock the next jobs
     * record while flagging it as reserved at the same time.
     *
     * @param  string|null $queue
Jens Segers's avatar
Jens Segers committed
65 66 67
     *
     * @return \StdClass|null
     */
68
    protected function getNextAvailableJobAndReserve($queue)
Jens Segers's avatar
Jens Segers committed
69
    {
Jens Segers's avatar
Jens Segers committed
70
        $job = $this->database->getCollection($this->table)->findOneAndUpdate(
71
            [
72 73 74
                'queue' => $this->getQueue($queue),
                'reserved' => 0,
                'available_at' => ['$lte' => Carbon::now()->getTimestamp()],
75 76 77
            ],
            [
                '$set' => [
78 79
                    'reserved' => 1,
                    'reserved_at' => Carbon::now()->getTimestamp(),
80 81 82
                ],
            ],
            [
83
                'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
84
                'sort' => ['available_at' => 1],
85 86 87
            ]
        );

Fady Khalife's avatar
Fady Khalife committed
88
        if ($job) {
Jens Segers's avatar
Jens Segers committed
89 90 91
            $job->id = $job->_id;
        }

92
        return $job;
Jens Segers's avatar
Jens Segers committed
93 94 95 96 97
    }

    /**
     * Release the jobs that have been reserved for too long.
     *
98
     * @param  string $queue
Jens Segers's avatar
Jens Segers committed
99 100 101 102
     * @return void
     */
    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
    {
103
        $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
Pooya Parsa's avatar
Pooya Parsa committed
104
        $now = time();
Jens Segers's avatar
Jens Segers committed
105 106

        $reserved = $this->database->collection($this->table)
107
            ->where('queue', $this->getQueue($queue))
Pooya Parsa's avatar
Pooya Parsa committed
108 109 110 111 112 113 114 115 116 117
            ->where(function ($query) use ($expiration, $now) {
                // Check for available jobs
                $query->where(function ($query) use ($now) {
                    $query->whereNull('reserved_at');
                    $query->where('available_at', '<=', $now);
                });

                // Check for jobs that are reserved but have expired
                $query->orWhere('reserved_at', '<=', $expiration);
            })->get();
Jens Segers's avatar
Jens Segers committed
118 119 120 121 122 123

        foreach ($reserved as $job) {
            $attempts = $job['attempts'] + 1;
            $this->releaseJob($job['_id'], $attempts);
        }
    }
124

125 126 127 128
    /**
     * Release the given job ID from reservation.
     *
     * @param  string $id
Jens Segers's avatar
Jens Segers committed
129
     * @param  int $attempts
130 131 132 133 134
     * @return void
     */
    protected function releaseJob($id, $attempts)
    {
        $this->database->table($this->table)->where('_id', $id)->update([
135
            'reserved' => 0,
136
            'reserved_at' => null,
137
            'attempts' => $attempts,
138 139 140 141
        ]);
    }

    /**
142
     * @inheritdoc
143 144 145
     */
    public function deleteReserved($queue, $id)
    {
Jens Segers's avatar
Jens Segers committed
146
        $this->database->collection($this->table)->where('_id', $id)->delete();
147
    }
Jens Segers's avatar
Jens Segers committed
148
}