Skip to content

marko/pubsub-redis

Non-blocking Redis pub/sub for Marko --- publish and subscribe over Redis with pattern support, powered by amphp for async I/O. Provides RedisPublisher and RedisSubscriber, implementing the PublisherInterface and SubscriberInterface contracts from marko/pubsub. Uses amphp/redis for non-blocking Redis connections so the subscriber loop never stalls. Pattern subscriptions (glob-style channel matching) are fully supported.

Installing this package binds PublisherInterface and SubscriberInterface to the Redis driver automatically --- no manual wiring required.

Terminal window
composer require marko/pubsub-redis

This automatically installs marko/pubsub and marko/amphp.

Set environment variables or publish the config file:

Terminal window
PUBSUB_REDIS_HOST=127.0.0.1
PUBSUB_REDIS_PORT=6379
PUBSUB_REDIS_PASSWORD=
PUBSUB_REDIS_DATABASE=0
PUBSUB_DRIVER=redis
PUBSUB_PREFIX=marko:

Inject PublisherInterface --- the Redis driver is used automatically:

use Marko\PubSub\Message;
use Marko\PubSub\PublisherInterface;
class NotificationService
{
public function __construct(
private PublisherInterface $publisher,
) {}
public function notify(int $userId, string $text): void
{
$this->publisher->publish(
channel: "user.$userId",
message: new Message(
channel: "user.$userId",
payload: json_encode(['text' => $text]),
),
);
}
}

Inject SubscriberInterface and iterate the Subscription. Run the subscriber loop via the pubsub:listen command:

use Marko\PubSub\SubscriberInterface;
class NotificationListener
{
public function __construct(
private SubscriberInterface $subscriber,
) {}
public function listen(int $userId): void
{
$subscription = $this->subscriber->subscribe("user.$userId");
foreach ($subscription as $message) {
$data = json_decode($message->payload, true);
// handle notification ...
}
}
}

Start the listener process:

Terminal window
marko pubsub:listen

Use psubscribe() to receive messages from all channels matching a glob pattern:

$subscription = $this->subscriber->psubscribe('user.*');
foreach ($subscription as $message) {
// $message->pattern === 'user.*'
// $message->channel is the matched channel, e.g. 'user.42'
$data = json_decode($message->payload, true);
}

Combine with marko/sse to stream pub/sub messages to the browser:

use Marko\PubSub\SubscriberInterface;
use Marko\Routing\Route\Get;
use Marko\Sse\SseStream;
use Marko\Sse\StreamingResponse;
#[Get('/users/{userId}/notifications')]
public function stream(int $userId): StreamingResponse
{
$subscription = $this->subscriber->subscribe("user.$userId");
$stream = new SseStream(
subscription: $subscription,
timeout: 300,
);
return new StreamingResponse($stream);
}

Override the Redis connection by extending RedisPubSubConnection via a Preference:

use Amp\Redis\RedisClient;
use Marko\PubSub\Redis\RedisPubSubConnection;
class TlsRedisPubSubConnection extends RedisPubSubConnection
{
protected function createClient(): RedisClient
{
return \Amp\Redis\createRedisClient("rediss://$this->host:$this->port");
}
}

Register it in your module:

module.php
use Marko\PubSub\Redis\RedisPubSubConnection;
return [
'bindings' => [
RedisPubSubConnection::class => TlsRedisPubSubConnection::class,
],
];
MethodDescription
__construct(RedisPubSubConnection $redisPubSubConnection, PubSubConfig $pubSubConfig)Create a publisher with a Redis connection and pub/sub configuration
publish(string $channel, Message $message): voidPublish a message to the given channel
MethodDescription
__construct(RedisPubSubConnection $redisPubSubConnection, PubSubConfig $pubSubConfig)Create a subscriber with a Redis connection and pub/sub configuration
subscribe(string ...$channels): SubscriptionSubscribe to one or more channels, returning an iterable Subscription
psubscribe(string ...$patterns): SubscriptionSubscribe to channels matching glob patterns, returning an iterable Subscription
MethodDescription
__construct(AmphpRedisSubscription $amphpSubscription, string $prefix, ?string $channel, ?string $pattern)Wrap an amphp subscription with prefix stripping and message conversion
getIterator(): GeneratorYield Message instances --- includes pattern and resolved channel for pattern subscriptions
cancel(): voidUnsubscribe and stop iteration
MethodDescription
__construct(string $host, int $port, ?string $password, int $database, string $prefix)Create a connection with host (127.0.0.1), port (6379), optional password, database index (0), and channel prefix (marko:)
client(): RedisClientGet the Redis client instance --- lazily connected on first call
connector(): RedisConnectorGet the Redis connector instance --- lazily created on first call
disconnect(): voidDisconnect and release both client and connector instances
isConnected(): boolCheck whether a client instance is currently active