Skip to content

marko/queue-database

Database queue driver --- stores and processes jobs in SQL tables with transaction-safe polling and failed job persistence. Jobs are stored in a jobs table and polled by the worker process. The driver uses row-level locking (via transactions when available) to prevent duplicate processing. Failed jobs are persisted to a failed_jobs table for later inspection and retry. Includes migrations for both tables.

Implements QueueInterface from marko/queue and requires marko/database for the database connection.

Terminal window
composer require marko/queue-database

Requires marko/database for the database connection.

Register the database queue in your module bindings:

module.php
use Marko\Queue\QueueInterface;
use Marko\Queue\Database\DatabaseQueue;
use Marko\Queue\FailedJobRepositoryInterface;
use Marko\Queue\Database\DatabaseFailedJobRepository;
return [
'bindings' => [
QueueInterface::class => DatabaseQueue::class,
FailedJobRepositoryInterface::class => DatabaseFailedJobRepository::class,
],
];

Run the included migrations to create the required tables:

Terminal window
marko migrate

This creates:

  • jobs --- stores pending and reserved jobs
  • failed_jobs --- stores jobs that exceeded max attempts

Use QueueInterface as usual --- the database driver handles persistence:

use Marko\Queue\QueueInterface;
public function __construct(
private readonly QueueInterface $queue,
) {}
public function enqueue(): void
{
$this->queue->push(new ProcessOrder($orderId));
// Delay by 5 minutes
$this->queue->later(
300,
new SendFollowUp($orderId),
);
}

Process jobs with the worker:

Terminal window
marko queue:work

Implements QueueInterface. Accepts a ConnectionInterface connection, an optional table name (defaults to jobs), and an optional default queue name (defaults to default).

MethodDescription
push(JobInterface $job, ?string $queue = null): stringInsert a job for immediate processing. Returns the job ID.
later(int $delay, JobInterface $job, ?string $queue = null): stringInsert a job with a delay in seconds. Returns the job ID.
pop(?string $queue = null): ?JobInterfaceRetrieve and reserve the next available job, or null if empty. Uses transactions when the connection supports TransactionInterface.
size(?string $queue = null): intCount pending (unreserved, available) jobs.
clear(?string $queue = null): intDelete all jobs in a queue. Returns the number of deleted rows.
delete(string $jobId): boolDelete a specific job by ID.
release(string $jobId, int $delay = 0): boolRelease a reserved job back to the queue with an optional delay.

Implements FailedJobRepositoryInterface. Accepts a ConnectionInterface connection.

MethodDescription
store(FailedJob $failedJob): voidPersist a failed job record.
all(): arrayRetrieve all failed jobs, most recent first.
find(string $id): ?FailedJobFind a failed job by ID, or null if not found.
delete(string $id): boolDelete a single failed job record.
clear(): intDelete all failed job records. Returns the number of deleted rows.
count(): intCount total failed jobs.