🗃️ Add priority to queue

This commit is contained in:
Dan Jones 2022-08-20 22:04:32 -05:00
commit 61f9cfb009
7 changed files with 171 additions and 2 deletions

View file

@ -0,0 +1,8 @@
<?php
namespace App\Jobs\Concerns;
interface HasPriority
{
public function getPriority(): ?int;
}

View file

@ -9,7 +9,7 @@ use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels; use Illuminate\Queue\SerializesModels;
class SayHello implements ShouldQueue class SayHello implements ShouldQueue, Concerns\HasPriority
{ {
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
@ -33,4 +33,9 @@ class SayHello implements ShouldQueue
echo 'Hello, from ', __CLASS__, PHP_EOL; echo 'Hello, from ', __CLASS__, PHP_EOL;
passthru ('sleep 10; echo Hello, from echo in ' . escapeshellarg(__CLASS__)); passthru ('sleep 10; echo Hello, from echo in ' . escapeshellarg(__CLASS__));
} }
public function getPriority(): ?int
{
return 1;
}
} }

View file

@ -4,6 +4,8 @@ namespace App\Providers;
use App\Services\ProcessInput; use App\Services\ProcessInput;
use Illuminate\Support\ServiceProvider; use Illuminate\Support\ServiceProvider;
use Illuminate\Contracts\Queue\Factory as QueueFactoryContract;
use App\Queue\DatabaseConnector;
class AppServiceProvider extends ServiceProvider class AppServiceProvider extends ServiceProvider
{ {
@ -14,7 +16,8 @@ class AppServiceProvider extends ServiceProvider
*/ */
public function boot() public function boot()
{ {
// $this->app[QueueFactoryContract::class]->extend('database', fn () => new DatabaseConnector($this->app['db']));
} }
/** /**

View file

@ -0,0 +1,19 @@
<?php
namespace App\Queue;
use Illuminate\Queue\Connectors\DatabaseConnector as BaseConnector;
class DatabaseConnector extends BaseConnector
{
public function connect(array $config)
{
return new DatabaseQueue(
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60,
$config['after_commit'] ?? null
);
}
}

132
app/Queue/DatabaseQueue.php Normal file
View file

@ -0,0 +1,132 @@
<?php
namespace App\Queue;
use Illuminate\Queue\DatabaseQueue as BaseQ;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use App\Jobs\Concerns\HasPriority;
class DatabaseQueue extends BaseQ
{
protected function getPriority($job): ?int
{
if ($job instanceof DatabaseRecord) {
return $job->priority ?? null;
}
if (!($job instanceof HasPriority)) {
return null;
}
return $job->getPriority();
}
public function push($job, $data = '', $queue = null)
{
$this->later(null, $job, $data, $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
$prio = $options['priority'] ?? null;
return $this->pushToDatabase($queue, $payload, priority: $prio);
}
public function release($queue, $job, $delay)
{
$prio = $this->getPriority($job);
return $this->pushToDatabase(
$queue,
$job->payload,
$delay,
$job->attempts,
$prio
);
}
public function later($delay, $job, $data = '', $queue = null)
{
$prio = $this->getPriority($job);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) use ($prio) {
return $this->pushToDatabase($queue, $payload, $delay, priority: $prio);
}
);
}
public function bulk($jobs, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);
$availableAt = $this->availableAt();
return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
function ($job) use ($queue, $data, $availableAt) {
$prio = $this->getPriority($job);
return $this->buildDatabaseRecord(
$queue,
$this->createPayload($job, $this->getQueue($queue), $data),
$availableAt,
priority: $prio
);
}
)->all());
}
protected function getNextAvailableJob($queue)
{
$job = $this
->database
->table($this->table)
->lock($this->getLockForPopping())
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('priority', 'desc')
->orderBy('id', 'asc')
->first();
return $job ? new DatabaseJobRecord((object) $job) : null;
}
protected function pushToDatabase(
$queue,
$payload,
$delay = 0,
$attempts = 0,
int $priority = null
) {
return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
$this->getQueue($queue),
$payload,
$this->availableAt($delay ?? 0),
$attempts,
$priority
));
}
protected function buildDatabaseRecord(
$queue,
$payload,
$availableAt,
$attempts = 0,
int $priority = null
) {
$job = parent::buildDatabaseRecord($queue, $payload, $availableAt, $attempts);
if (!is_null($priority)) {
$job['priority'] = $priority;
}
return $job;
}
}

View file

@ -85,6 +85,7 @@ return [
*/ */
'failed' => [ 'failed' => [
'driver' => 'database-uuids',
'database' => env('DB_CONNECTION', 'sqlite'), 'database' => env('DB_CONNECTION', 'sqlite'),
'table' => 'failed_jobs', 'table' => 'failed_jobs',
], ],

View file

@ -18,6 +18,7 @@ return new class extends Migration
$table->string('queue')->index(); $table->string('queue')->index();
$table->longText('payload'); $table->longText('payload');
$table->unsignedTinyInteger('attempts'); $table->unsignedTinyInteger('attempts');
$table->smallInteger('priority')->default(3);
$table->unsignedInteger('reserved_at')->nullable(); $table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at'); $table->unsignedInteger('available_at');
$table->unsignedInteger('created_at'); $table->unsignedInteger('created_at');