Skip to content

marko/pubsub-pgsql

Zero-infrastructure pub/sub via PostgreSQL LISTEN/NOTIFY --- real-time messaging using the database you already have, no Redis required. Provides PgSqlPublisher and PgSqlSubscriber, implementing the PublisherInterface and SubscriberInterface contracts from marko/pubsub. Uses PostgreSQL’s built-in NOTIFY/LISTEN commands, delivered over an async connection via amphp/postgres. No additional infrastructure is required beyond your existing database.

Installing this package binds PublisherInterface and SubscriberInterface to the PostgreSQL driver automatically.

Terminal window
composer require marko/pubsub-pgsql

This automatically installs marko/pubsub and marko/amphp.

Set environment variables or publish the config file:

Terminal window
PUBSUB_PGSQL_HOST=127.0.0.1
PUBSUB_PGSQL_PORT=5432
PUBSUB_PGSQL_USER=app
PUBSUB_PGSQL_PASSWORD=secret
PUBSUB_PGSQL_DATABASE=app
PUBSUB_DRIVER=pgsql
PUBSUB_PREFIX=marko_

Inject PublisherInterface --- the PostgreSQL driver issues NOTIFY automatically:

use Marko\PubSub\Message;
use Marko\PubSub\PublisherInterface;
class OrderService
{
public function __construct(
private PublisherInterface $publisher,
) {}
public function placeOrder(Order $order): void
{
// ... persist the order ...
$this->publisher->publish(
channel: 'orders',
message: new Message(
channel: 'orders',
payload: json_encode(['id' => $order->id, 'status' => 'placed']),
),
);
}
}

Inject SubscriberInterface and iterate the Subscription. Run the subscriber loop via the pubsub:listen command:

use Marko\PubSub\SubscriberInterface;
class OrderListener
{
public function __construct(
private SubscriberInterface $subscriber,
) {}
public function listen(): void
{
$subscription = $this->subscriber->subscribe('orders');
foreach ($subscription as $message) {
$data = json_decode($message->payload, true);
// handle order ...
}
}
}

Start the listener process:

Terminal window
marko pubsub:listen

Pass multiple channel names to subscribe to all of them in a single call:

$subscription = $this->subscriber->subscribe('orders', 'shipments', 'returns');
foreach ($subscription as $message) {
// $message->channel tells you which channel delivered the message
}

Combine with marko/sse to push database notifications to the browser:

use Marko\PubSub\SubscriberInterface;
use Marko\Routing\Route\Get;
use Marko\Sse\SseStream;
use Marko\Sse\StreamingResponse;
#[Get('/orders/stream')]
public function stream(): StreamingResponse
{
$subscription = $this->subscriber->subscribe('orders');
$stream = new SseStream(
subscription: $subscription,
timeout: 300,
);
return new StreamingResponse($stream);
}

Override the PostgreSQL connection by extending PgSqlPubSubConnection via a Preference:

use Marko\PubSub\PgSql\PgSqlPubSubConnection;
use Amp\Postgres\PostgresConfig;
class SslPgSqlPubSubConnection extends PgSqlPubSubConnection
{
protected function createConfig(): PostgresConfig
{
return new PostgresConfig(
host: $this->host,
port: $this->port,
user: $this->user,
password: $this->password,
database: $this->database,
);
}
}

Register it in your module:

module.php
return [
'bindings' => [
\Marko\PubSub\PgSql\PgSqlPubSubConnection::class => SslPgSqlPubSubConnection::class,
],
];

Implements PublisherInterface. Sends messages via PostgreSQL NOTIFY.

MethodDescription
__construct(PgSqlPubSubConnection $pgSqlPubSubConnection, PubSubConfig $pubSubConfig)Create a publisher with a PostgreSQL connection and pub/sub configuration
publish(string $channel, Message $message): voidPublish a message to the given channel --- issues a NOTIFY on the prefixed channel

Implements SubscriberInterface. Listens for messages via PostgreSQL LISTEN.

MethodDescription
__construct(PgSqlPubSubConnection $pgSqlPubSubConnection, PubSubConfig $pubSubConfig)Create a subscriber with a PostgreSQL connection and pub/sub configuration
subscribe(string ...$channels): SubscriptionSubscribe to one or more channels --- issues a LISTEN for each prefixed channel
psubscribe(string ...$patterns): SubscriptionAlways throws PubSubException --- pattern subscriptions are not supported by PostgreSQL

Implements Subscription. Wraps PostgreSQL listener(s) and yields Message instances.

MethodDescription
__construct(array $listeners, string $prefix)Create a subscription from an array of PostgresListener instances and the channel prefix
getIterator(): GeneratorYields Message instances as notifications arrive --- strips the prefix from channel names
cancel(): voidUnlisten from all channels and stop receiving notifications

Manages the async PostgreSQL connection used for pub/sub. Lazily connects on first use.

MethodDescription
__construct(string $host, int $port, ?string $user, ?string $password, ?string $database, string $prefix)Create a connection with host (127.0.0.1), port (5432), optional user/password/database, and channel prefix (marko_)
connection(): PostgresConnectionGet the async PostgreSQL connection --- lazily connected on first call
disconnect(): voidDisconnect and release the connection instance
isConnected(): boolCheck whether a connection instance is currently active