Skip to content

marko/queue-rabbitmq

RabbitMQ queue driver --- processes jobs through AMQP with persistent messages, exchange routing, and delayed delivery. Jobs are published as persistent AMQP messages through configurable exchanges (direct, fanout, topic, or headers). Delayed jobs use dead-letter exchanges for timed redelivery. Failed jobs are stored in a dedicated RabbitMQ queue for inspection and retry. Requires a running RabbitMQ server and the php-amqplib/php-amqplib package.

Implements QueueInterface and FailedJobRepositoryInterface from marko/queue.

Terminal window
composer require marko/queue-rabbitmq

This automatically installs marko/queue and php-amqplib/php-amqplib.

Register the RabbitMQ queue in your module bindings:

module.php
use Marko\Queue\QueueInterface;
use Marko\Queue\Rabbitmq\RabbitmqQueue;
use Marko\Queue\FailedJobRepositoryInterface;
use Marko\Queue\Rabbitmq\RabbitmqFailedJobRepository;
return [
'bindings' => [
QueueInterface::class => RabbitmqQueue::class,
FailedJobRepositoryInterface::class => RabbitmqFailedJobRepository::class,
],
];

RabbitmqConnection manages the AMQP connection. It lazily connects on the first call to channel():

use Marko\Queue\Rabbitmq\RabbitmqConnection;
$rabbitmqConnection = new RabbitmqConnection(
host: 'localhost',
port: 5672,
user: 'guest',
password: 'guest',
vhost: '/',
);

TLS is supported via the tlsOptions parameter:

use Marko\Queue\Rabbitmq\RabbitmqConnection;
$rabbitmqConnection = new RabbitmqConnection(
host: 'rabbitmq.example.com',
port: 5671,
user: 'app',
password: 'secret',
tlsOptions: [
'verify_peer' => true,
'cafile' => '/path/to/ca.pem',
],
);

Set up the exchange type and behavior:

use Marko\Queue\Rabbitmq\Exchange\ExchangeConfig;
use Marko\Queue\Rabbitmq\Exchange\ExchangeType;
$exchangeConfig = new ExchangeConfig(
name: 'marko_jobs',
type: ExchangeType::Direct,
);

Available exchange types: Direct, Fanout, Topic, Headers.

For Direct and Topic exchanges, the queue name is used as the routing key. Fanout and Headers exchanges use an empty routing key.

Use QueueInterface as usual --- the RabbitMQ driver handles persistent message publishing and dead-letter routing transparently:

use Marko\Queue\QueueInterface;
class OrderProcessor
{
public function __construct(
private readonly QueueInterface $queue,
) {}
public function dispatch(): void
{
$this->queue->push(new ProcessPayment($orderId));
// Delay by 30 seconds using dead-letter exchange
$this->queue->later(
30,
new SendReceipt($orderId),
);
}
}
MethodDescription
push(JobInterface $job, ?string $queue = null): stringPublish a job as a persistent AMQP message, returning the job ID
later(int $delay, JobInterface $job, ?string $queue = null): stringPublish a delayed job via a dead-letter exchange --- $delay is in seconds
pop(?string $queue = null): ?JobInterfaceConsume the next message from the queue, or null if empty
size(?string $queue = null): intReturn the number of messages in the queue
clear(?string $queue = null): intPurge all messages from the queue, returning the count removed
delete(string $jobId): boolAcknowledge a consumed message by job ID
release(string $jobId, int $delay = 0): boolReject and requeue a message --- with optional delay via dead-letter exchange

Constructor:

use Marko\Queue\Rabbitmq\RabbitmqQueue;
use Marko\Queue\Rabbitmq\RabbitmqConnection;
use Marko\Queue\Rabbitmq\Exchange\ExchangeConfig;
$rabbitmqQueue = new RabbitmqQueue(
connection: $rabbitmqConnection,
exchangeConfig: $exchangeConfig,
defaultQueue: 'default',
);
MethodDescription
__construct(string $host, int $port, string $user, string $password, string $vhost, ?array $tlsOptions)Create a connection --- defaults: host localhost, port 5672, user guest, password guest, vhost /, no TLS
channel(): AMQPChannelGet the AMQP channel --- lazily connected on first call
disconnect(): voidDisconnect and release the channel and connection
isConnected(): boolCheck whether the connection is currently active

Stores failed jobs in a dedicated failed_jobs RabbitMQ queue as JSON messages with persistent delivery.

MethodDescription
store(FailedJob $failedJob): voidPublish a failed job record to the failed jobs queue
all(): arrayRetrieve all failed jobs without removing them
find(string $id): ?FailedJobFind a specific failed job by ID
delete(string $id): boolAcknowledge and remove a failed job by ID
clear(): intPurge all failed jobs, returning the count removed
count(): intReturn the number of failed jobs
use Marko\Queue\Rabbitmq\Exchange\ExchangeConfig;
use Marko\Queue\Rabbitmq\Exchange\ExchangeType;
readonly class ExchangeConfig
{
public function __construct(
public string $name,
public ExchangeType $type,
public bool $durable = true,
public bool $autoDelete = false,
/** @var array<string, mixed> */
public array $arguments = [],
) {}
}
enum ExchangeType: string
{
case Direct = 'direct';
case Fanout = 'fanout';
case Topic = 'topic';
case Headers = 'headers';
}