marko/pubsub
Real-time publish/subscribe messaging contracts --- type-hint against a stable interface and swap drivers without changing application code. PubSub defines the PublisherInterface and SubscriberInterface contracts and the value objects they operate on. It has no concrete implementation --- install a driver package to get a working pub/sub system. Your modules type-hint against the interfaces here, staying decoupled from any particular transport.
This package defines contracts only. Install a driver for implementation:
marko/pubsub-redis--- Redis-backed pub/submarko/pubsub-pgsql--- PostgreSQL-backed pub/sub
Installation
Section titled “Installation”composer require marko/pubsubInstall a driver alongside it:
composer require marko/pubsub-redis# orcomposer require marko/pubsub-pgsqlPublishing Messages
Section titled “Publishing Messages”Inject PublisherInterface and call publish() with a channel name and a Message:
use Marko\PubSub\Message;use Marko\PubSub\PublisherInterface;
class OrderService{ public function __construct( private PublisherInterface $publisher, ) {}
public function placeOrder(Order $order): void { // ... persist the order ...
$this->publisher->publish( channel: 'orders', message: new Message( channel: 'orders', payload: json_encode(['id' => $order->id, 'status' => 'placed']), ), ); }}Subscribing to a Channel
Section titled “Subscribing to a Channel”Inject SubscriberInterface, call subscribe(), and iterate the returned Subscription:
use Marko\PubSub\SubscriberInterface;
class OrderListener{ public function __construct( private SubscriberInterface $subscriber, ) {}
public function listen(): void { $subscription = $this->subscriber->subscribe('orders');
foreach ($subscription as $message) { $data = json_decode($message->payload, true); // handle the message ... } }}Pattern Subscriptions
Section titled “Pattern Subscriptions”Use psubscribe() to subscribe to channels matching a glob pattern. The matched channel is available on the message. Note: not all drivers support pattern subscriptions --- see driver documentation.
use Marko\PubSub\SubscriberInterface;
$subscription = $this->subscriber->psubscribe('orders.*');
foreach ($subscription as $message) { // $message->pattern === 'orders.*' // $message->channel is the matched channel name}Cancelling a Subscription
Section titled “Cancelling a Subscription”Call cancel() on the Subscription to unsubscribe:
use Marko\PubSub\SubscriberInterface;
$subscription = $this->subscriber->subscribe('orders');
foreach ($subscription as $message) { if ($this->shouldStop($message)) { $subscription->cancel(); break; }}API Reference
Section titled “API Reference”PublisherInterface
Section titled “PublisherInterface”use Marko\PubSub\Message;use Marko\PubSub\PublisherInterface;
public function publish(string $channel, Message $message): void;SubscriberInterface
Section titled “SubscriberInterface”use Marko\PubSub\Subscription;use Marko\PubSub\SubscriberInterface;
public function subscribe(string ...$channels): Subscription;public function psubscribe(string ...$patterns): Subscription;Subscription
Section titled “Subscription”Subscription extends IteratorAggregate<int, Message> and yields Message instances:
use Generator;use Marko\PubSub\Message;use Marko\PubSub\Subscription;
public function getIterator(): Generator; // yields Message instancespublic function cancel(): void;Message
Section titled “Message”A readonly value object representing a pub/sub message:
use Marko\PubSub\Message;
public function __construct( public string $channel, public string $payload, public ?string $pattern = null,)PubSubConfig
Section titled “PubSubConfig”Typed access to pub/sub configuration values:
use Marko\PubSub\PubSubConfig;
public function driver(): string;public function prefix(): string;Exceptions
Section titled “Exceptions”| Exception | Description |
|---|---|
PubSubException | Base exception for all pub/sub errors --- extends MarkoException with getContext() and getSuggestion() methods |
PubSubException provides named constructors for common failure scenarios:
use Marko\PubSub\Exceptions\PubSubException;
PubSubException::connectionFailed(string $driver, string $reason): self;PubSubException::subscriptionFailed(string $channel, string $reason): self;PubSubException::publishFailed(string $channel, string $reason): self;PubSubException::patternSubscriptionNotSupported(string $driver): self;