From 61f9cfb009fc957348c166dc9ddd5ae0177a6e46 Mon Sep 17 00:00:00 2001 From: Dan Jones Date: Sat, 20 Aug 2022 22:04:32 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=97=83=EF=B8=8F=20Add=20priority=20to=20q?= =?UTF-8?q?ueue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/Jobs/Concerns/HasPriority.php | 8 ++ app/Jobs/SayHello.php | 7 +- app/Providers/AppServiceProvider.php | 5 +- app/Queue/DatabaseConnector.php | 19 +++ app/Queue/DatabaseQueue.php | 132 ++++++++++++++++++ config/queue.php | 1 + .../2022_02_26_154759_create_jobs_table.php | 1 + 7 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 app/Jobs/Concerns/HasPriority.php create mode 100644 app/Queue/DatabaseConnector.php create mode 100644 app/Queue/DatabaseQueue.php diff --git a/app/Jobs/Concerns/HasPriority.php b/app/Jobs/Concerns/HasPriority.php new file mode 100644 index 0000000..5c8f3ba --- /dev/null +++ b/app/Jobs/Concerns/HasPriority.php @@ -0,0 +1,8 @@ +app[QueueFactoryContract::class]->extend('database', fn () => new DatabaseConnector($this->app['db'])); + } /** diff --git a/app/Queue/DatabaseConnector.php b/app/Queue/DatabaseConnector.php new file mode 100644 index 0000000..5d1bb93 --- /dev/null +++ b/app/Queue/DatabaseConnector.php @@ -0,0 +1,19 @@ +connections->connection($config['connection'] ?? null), + $config['table'], + $config['queue'], + $config['retry_after'] ?? 60, + $config['after_commit'] ?? null + ); + } +} diff --git a/app/Queue/DatabaseQueue.php b/app/Queue/DatabaseQueue.php new file mode 100644 index 0000000..c0685a3 --- /dev/null +++ b/app/Queue/DatabaseQueue.php @@ -0,0 +1,132 @@ +priority ?? null; + } + + if (!($job instanceof HasPriority)) { + return null; + } + + return $job->getPriority(); + } + + public function push($job, $data = '', $queue = null) + { + $this->later(null, $job, $data, $queue); + } + + public function pushRaw($payload, $queue = null, array $options = []) + { + $prio = $options['priority'] ?? null; + + return $this->pushToDatabase($queue, $payload, priority: $prio); + } + + public function release($queue, $job, $delay) + { + $prio = $this->getPriority($job); + + return $this->pushToDatabase( + $queue, + $job->payload, + $delay, + $job->attempts, + $prio + ); + } + + public function later($delay, $job, $data = '', $queue = null) + { + $prio = $this->getPriority($job); + + return $this->enqueueUsing( + $job, + $this->createPayload($job, $this->getQueue($queue), $data), + $queue, + $delay, + function ($payload, $queue, $delay) use ($prio) { + return $this->pushToDatabase($queue, $payload, $delay, priority: $prio); + } + ); + } + + public function bulk($jobs, $data = '', $queue = null) + { + $queue = $this->getQueue($queue); + + $availableAt = $this->availableAt(); + + return $this->database->table($this->table)->insert(collect((array) $jobs)->map( + function ($job) use ($queue, $data, $availableAt) { + $prio = $this->getPriority($job); + + return $this->buildDatabaseRecord( + $queue, + $this->createPayload($job, $this->getQueue($queue), $data), + $availableAt, + priority: $prio + ); + } + )->all()); + } + + protected function getNextAvailableJob($queue) + { + $job = $this + ->database + ->table($this->table) + ->lock($this->getLockForPopping()) + ->where('queue', $this->getQueue($queue)) + ->where(function ($query) { + $this->isAvailable($query); + $this->isReservedButExpired($query); + }) + ->orderBy('priority', 'desc') + ->orderBy('id', 'asc') + ->first(); + + return $job ? new DatabaseJobRecord((object) $job) : null; + } + + protected function pushToDatabase( + $queue, + $payload, + $delay = 0, + $attempts = 0, + int $priority = null + ) { + return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( + $this->getQueue($queue), + $payload, + $this->availableAt($delay ?? 0), + $attempts, + $priority + )); + } + + protected function buildDatabaseRecord( + $queue, + $payload, + $availableAt, + $attempts = 0, + int $priority = null + ) { + $job = parent::buildDatabaseRecord($queue, $payload, $availableAt, $attempts); + if (!is_null($priority)) { + $job['priority'] = $priority; + } + + return $job; + } +} diff --git a/config/queue.php b/config/queue.php index 024c486..7e6b4c5 100644 --- a/config/queue.php +++ b/config/queue.php @@ -85,6 +85,7 @@ return [ */ 'failed' => [ + 'driver' => 'database-uuids', 'database' => env('DB_CONNECTION', 'sqlite'), 'table' => 'failed_jobs', ], diff --git a/database/migrations/2022_02_26_154759_create_jobs_table.php b/database/migrations/2022_02_26_154759_create_jobs_table.php index a786a89..286124d 100644 --- a/database/migrations/2022_02_26_154759_create_jobs_table.php +++ b/database/migrations/2022_02_26_154759_create_jobs_table.php @@ -18,6 +18,7 @@ return new class extends Migration $table->string('queue')->index(); $table->longText('payload'); $table->unsignedTinyInteger('attempts'); + $table->smallInteger('priority')->default(3); $table->unsignedInteger('reserved_at')->nullable(); $table->unsignedInteger('available_at'); $table->unsignedInteger('created_at');