ndrean/PG-CDC-NATS-BRIDGE
Bridge server Postgres CDC to NATS/JetStream server
main.tar.gzA lightweight (15MB), opinionated bridge for streaming PostgreSQL changes to NATS JetStream. Built with Zig for minimal overhead, includes table bootstrapping for consumer initialization.
⚠️ Status: Early stage, not yet battle-tested in production. Suitable for experimentation and non-critical workloads.
flowchart LR
PG[Postgres<br/>Log. Repl.] --> Bridge[Bridge Server<br/>5 threads]
Bridge --> NATS[NATS JetStream]
subgraph NATS
CDC[CDC Stream<br>cdc.table.op]
INIT[INIT Stream<br>init.snap.*, <br>init.meta.*]
KV[KV Store: <br>schemas<br>key=table]
end
NATS --> Consumer[Consumer<br>Local <br>SQLite/PGLite]
We use NATS JetStream to solve the problem of distributing PostgreSQL's logical replication as both solve the hard problems: ordering, durability, idempotency.
The bridge just connects them correctly.
This bridge is an experiment in minimalism: can PostgreSQL CDC be done with 16MB and 7MB RAM while preserving correctness?
The design makes deliberate trade-offs for simplicity and efficiency.
Two-phase data flow:
Key features:
Perfect for: Consumers wanting to mirror PostgreSQL tables locally (SQLite, PGLite, etc.) and stay synchronized with real-time changes.
Example: Edge applications, mobile apps, or analytics workers that need a local replica of specific tables without querying the main database.
Current default: MessagePack
Future option: --format json flag for browser compatibility (NATS supports WebSocket connections)
Design choice: One bridge instance = one replication slot = sequential processing
Rationale:
To scale throughput:: the PostgreSQL admin creates the publication and scope (pub_name for which tables).
# Bridge 1: 60K events/s for table "users" with publication "my_pub_1"
./bridge --slot slot_1 --pub my_pub_1 --port 9090
# Bridge 2: Another 60K events/s for table "orders" with publication "my_pub_2"
./bridge --slot slot_2 --pub my_pub_2 --port 9091
Trade-off: Multiple processes vs single multi-threaded process. This approach prioritizes operational simplicity.
The system has two separate acknowledgment flows that work independently:
The bridge's challenge: When can we safely tell PostgreSQL to prune WAL?
PostgreSQL WAL → Bridge → NATS JetStream
↑ ↓
└─── ACK after JetStream confirms
How it works:
Why this matters:
Backpressure:
This flow happens outside of the scope of hte bridge server.
The consumer's challenge: When can JetStream prune delivered messages?
NATS JetStream → Consumer
↑ ↓
└──── Consumer ACKs (or NAKs)
How it works:
Why this matters:
Backpressure:
Bridge doesn't wait for consumers:
Separation of concerns:
| Component | Responsibility | ACK Target |
|---|---|---|
| Bridge | Get data from PG into NATS reliably | PostgreSQL (LSN) |
| NATS JetStream | Deliver to consumers durably | N/A (handles both flows) |
| Consumer | Process data and track progress | NATS JetStream (consumer position) |
This architecture keeps PostgreSQL WAL lean while allowing consumers to replay/lag independently.
| Stream | Purpose | Retention | Consumer Pattern |
|---|---|---|---|
| CDC | Real-time changes | Short (1 min) | Continuous subscription |
| INIT | Bootstrap snapshots | Long (7 days) | One-time replay |
Why separate?
Flow:
snapshot.request.{table}Why on-demand?
Requires superuser privileges (e.g., postgres user). The bridge uses a restricted bridge_reader user for security.
Add to postgresql.conf or Docker command:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
max_slot_wal_keep_size = 10GB
wal_sender_timeout = 300s # 5 minutes
-- Run as superuser
CREATE PUBLICATION cdc_pub FOR ALL TABLES;
To filter specific tables:
CREATE PUBLICATION cdc_pub FOR TABLE users, orders;
-- Run as superuser
CREATE USER bridge_reader WITH REPLICATION PASSWORD 'secure_password';
GRANT CONNECT, CREATE ON DATABASE postgres TO bridge_reader;
GRANT USAGE ON SCHEMA public TO bridge_reader;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO bridge_reader;
-- Auto-grant for future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO bridge_reader;
Why two users?
postgres): Creates publications (security-sensitive)bridge_reader): Restricted to SELECT + REPLICATION (least privilege)See init.sh for a complete setup script.
nats-server -js -m 8222
Or via Docker:
docker run -p 4222:4222 -p 8222:8222 nats:latest -js -m 8222
# CDC stream: Short retention for real-time events
nats stream add CDC \
--subjects='cdc.>' \
--storage=file \
--retention=limits \
--max-age=1m \
--max-msgs=1000000 \
--max-bytes=1G \
--replicas=1
# INIT stream: Long retention for bootstrap data
nats stream add INIT \
--subjects='init.>' \
--storage=file \
--retention=limits \
--max-age=7d \
--max-msgs=10000000 \
--max-bytes=8G \
--replicas=1
nats kv add schemas --history=10 --replicas=1
Example nats-server.conf:
port: 4222
jetstream {
store_dir: "/data"
max_memory_store: 1GB
max_file_store: 10GB
}
accounts {
BRIDGE: {
jetstream: {
max_memory: 1GB
max_file: 10GB
max_streams: 10
max_consumers: 10
}
users: [
{user: "bridge_user", password: "bridge_password"}
]
}
}
See docker-compose.yml for a complete Docker setup with PostgreSQL + NATS.
1. Consumer starts
2. Fetches schemas from NATS KV store
GET kv://schemas/{table_name} → MessagePack schema
3. Checks if local DB needs bootstrap
4. If yes, publishes snapshot request
PUBLISH snapshot.request.{table_name} (empty payload)
5. Bridge snapshot_listener receives request
6. Bridge generates snapshot in 10K row chunks
→ Publishes to init.snap.{table}.{snapshot_id}.{chunk}
7. Consumer receives chunks, reconstructs table
8. Consumer receives metadata on init.meta.{table}
9. Consumer subscribes to CDC stream for real-time updates
The native concurrency of the BEAM and Elixir makes it a natural candidate to consume "consistant" data.
1. Fetch Schemas
def fetch_schema(table_name) do
case Gnat.Jetstream.API.KV.get_value(:gnat, "schemas", table_name) do
schema_data when is_binary(schema_data) ->
{:ok, schema} = Msgpax.unpack(schema_data)
# schema = %{"table" => "users", "columns" => [...]}
_ ->
{:error, :not_found}
end
end
2. Request Snapshot
def request_snapshot(table_name) do
# Check if INIT stream is empty (needs fresh snapshot)
{:ok, stream_info} = Gnat.Jetstream.API.Stream.info(:gnat, "INIT")
stream_messages = stream_info["state"]["messages"] || 0
if stream_messages == 0 do
# Request snapshot
:ok = Gnat.pub(:gnat, "snapshot.request.#{table_name}", "")
Logger.info("Requested snapshot for #{table_name}")
:ok
end
end
3. Subscribe to INIT Stream
# Create durable consumer === persistent
consumer_config = %Gnat.Jetstream.API.Consumer{
durable_name: "my_init_consumer",
stream_name: "INIT",
filter_subject: "init.>",
ack_policy: :explicit,
ack_wait: 60_000_000_000, # 60 seconds in nanoseconds
max_deliver: 3
}
def handle_init_message(message) do
{:ok, payload} = Msgpax.unpack(message.body)
case payload do
%{"operation" => "snapshot", "data" => rows} ->
# Insert rows into local DB
insert_bulk_rows(rows)
{:ack, state}
_ ->
{:ack, state}
end
end
4. Subscribe to CDC Stream
consumer_config = %Gnat.Jetstream.API.Consumer{
durable_name: "my_cdc_consumer",
stream_name: "CDC",
filter_subject: "cdc.users.>", # Or "cdc.>" for all tables
ack_policy: :explicit,
max_batch: 100
}
def handle_cdc_message(message) do
{:ok, payload} = Msgpax.unpack(message.body)
case payload["operation"] do
"INSERT" -> insert_row(payload["columns"])
"UPDATE" -> update_row(payload["columns"])
"DELETE" -> delete_row(payload["columns"])
end
{:ack, state}
end
See consumer/lib/consumer/ for a complete Elixir example.
NOTE We hardcoded the stream names "CDC" and "INIT".
./bridge --slot cdc_slot \
--publication cdc_pub \
--port 9090
./bridge --slot cdc_slot \
--publication cdc_pub \
--table users,orders \
--port 9090
# PostgreSQL connection
export PG_HOST=localhost
export PG_PORT=5432
export PG_USER=bridge_reader
export PG_PASSWORD=secure_password
export PG_DB=postgres
# NATS connection
export NATS_HOST=localhost
export NATS_BRIDGE_USER=bridge_user
export NATS_BRIDGE_PASSWORD=bridge_password
# Bridge configuration
export BRIDGE_PORT=9090
Options:
--slot <NAME> Replication slot name (default: cdc_slot)
--publication <NAME> Publication name (default: cdc_pub)
--table <NAMES> Comma-separated table filter (default: all tables)
--port <PORT> HTTP telemetry port (default: 9090)
--help, -h Show this help message
# Start full stack (PostgreSQL + NATS + Bridge)
docker compose -f docker-compose.prod.yml up --build
See docker-compose.prod.yml for the complete setup.
Run multiple bridge instances with different replication slots:
# Terminal 1: Users table (60K events/s)
./bridge --stream CDC,INIT --slot slot_1 --table users --port 9090
# Terminal 2: Orders table (60K events/s)
./bridge --stream CDC,INIT --slot slot_2 --table orders --port 9091
# Total: 120K events/s, independent failure domains
Note: Actual payloads use MessagePack binary encoding. JSON examples below show the logical structure for illustration.
schemas.{table})Published at bridge startup. Consumers fetch before requesting snapshots.
{
"table": "users",
"schema": "public.users",
"timestamp": 1765201228,
"columns": [
{
"name": "id",
"position": 1,
"data_type": "integer",
"is_nullable": false,
"column_default": "nextval('users_id_seq'::regclass)"
},
{
"name": "name",
"position": 2,
"data_type": "text",
"is_nullable": false,
"column_default": null
},
{
"name": "email",
"position": 3,
"data_type": "text",
"is_nullable": true,
"column_default": null
}
]
}
init.meta.{table})Published after all chunks. Tells consumer how many chunks to expect.
{
"snapshot_id": "snap-1765208480",
"lsn": "0/191BFD0",
"timestamp": 1765208480,
"batch_count": 4,
"row_count": 4000,
"table": "users"
}
init.snap.{table}.{snapshot_id}.{chunk})10,000 rows per chunk (configurable in config.zig).
{
"table": "users",
"operation": "snapshot",
"snapshot_id": "snap-1765208480",
"chunk": 3,
"lsn": "0/191BFD0",
"data": [
{
"id": "3001",
"name": "User-3001",
"email": "[email protected]",
"created_at": "2025-12-08 13:45:21.719719+00"
},
{
"id": "3002",
"name": "User-3002",
"email": "[email protected]",
"created_at": "2025-12-08 13:45:22.123456+00"
}
// ... up to 10,000 rows
]
}
cdc.{table}.{operation})Real-time INSERT/UPDATE/DELETE events.
Subject pattern: cdc.{table}.{operation}
cdc.users.insertcdc.users.updatecdc.users.deleteMessage ID (for deduplication): {lsn}-{table}-{operation}
25cb3c8-users-insertINSERT event:
{
"table": "users",
"operation": "INSERT",
"relation_id": 16384,
"lsn": "0/25cb3c8",
"columns": [
{"name": "id", "value": 1},
{"name": "name", "value": "Alice"},
{"name": "email", "value": "[email protected]"},
{"name": "created_at", "value": "2025-12-10 10:30:00+00"}
]
}
UPDATE event:
{
"table": "users",
"operation": "UPDATE",
"relation_id": 16384,
"lsn": "0/25cb4d0",
"columns": [
{"name": "id", "value": 1},
{"name": "name", "value": "Alice Smith"},
{"name": "email", "value": "[email protected]"},
{"name": "created_at", "value": "2025-12-10 10:30:00+00"}
]
}
DELETE event:
{
"table": "users",
"operation": "DELETE",
"relation_id": 16384,
"lsn": "0/25cb5e8",
"columns": [
{"name": "id", "value": 1},
{"name": "name", "value": "Alice Smith"},
{"name": "email", "value": "[email protected]"},
{"name": "created_at", "value": "2025-12-10 10:30:00+00"}
]
}
The bridge provides telemetry through multiple channels:
flowchart LR
B[Bridge Server<br/>Telemetry] -->|GET /metrics<br/>Prometheus format| Prometheus
B -->|stdout<br/>Structured logfmt| Alloy/Loki
B -->|GET /status<br/>JSON format| HTTP_Client
HTTP GET http://localhost:9090/metrics
Returns metrics in Prometheus text format:
bridge_uptime_seconds 331
bridge_wal_messages_received_total 1797
bridge_cdc_events_published_total 288
bridge_last_ack_lsn 25509096
bridge_connected 1
bridge_reconnects_total 0
bridge_nats_reconnects_total 0
bridge_last_processing_time_us 2
bridge_slot_active 1
bridge_wal_lag_bytes 51344
Configure Prometheus to scrape this endpoint:
scrape_configs:
- job_name: 'cdc_bridge'
static_configs:
- targets: ['localhost:9090']
HTTP GET http://localhost:9090/status
Returns bridge status as JSON:
{
"status": "connected",
"uptime_seconds": 331,
"wal_messages_received": 1797,
"cdc_events_published": 288,
"current_lsn": "0/1832ce8",
"is_connected": true,
"reconnect_count": 0,
"nats_reconnect_count": 0,
"last_processing_time_us": 2,
"slot_active": true,
"wal_lag_bytes": 51344,
"wal_lag_mb": 0
}
The bridge emits structured metric logs to stdout every 15 seconds:
info(bridge): METRICS uptime=15 wal_messages=2 cdc_events=0 lsn=0/183f680 connected=1 reconnects=0 nats_reconnects=0 lag_bytes=51608 slot_active=1
Configure Grafana Alloy to parse these logs:
loki.source.file "bridge_logs" {
targets = [
{__path__ = "/var/log/bridge/*.log"},
]
forward_to = [loki.process.extract_metrics.receiver]
}
loki.process "extract_metrics" {
stage.regex {
expression = "METRICS uptime=(?P<uptime>\\d+) wal_messages=(?P<wal_msgs>\\d+) cdc_events=(?P<cdc_events>\\d+)"
}
stage.metrics {
metric.counter {
name = "bridge_wal_messages_total"
source = "wal_msgs"
}
metric.counter {
name = "bridge_cdc_events_total"
source = "cdc_events"
}
}
forward_to = [loki.write.default.receiver]
}
HTTP GET http://localhost:9090/health
Returns:
{"status":"ok"}
Status: 200 OK when bridge HTTP server is running.
Use for Docker health checks, Kubernetes probes, or load balancers.
HTTP POST http://localhost:9090/shutdown
Initiates graceful shutdown:
curl -X POST http://localhost:9090/shutdown
Shutdown sequence:
Get stream info:
curl "http://localhost:9090/streams/info?stream=CDC" | jq
Purge stream messages:
curl -X POST "http://localhost:9090/streams/purge?stream=TEST"
Create/delete streams:
curl -X POST "http://localhost:9090/streams/create?stream=TEST&subjects=test.>"
curl -X POST "http://localhost:9090/streams/delete?stream=TEST"
The guarantee:
If bridge crashes:
If NATS crashes:
max_slot_wal_keep_size=10GB)Message ID pattern: {lsn}-{table}-{operation}
25cb3c8-users-insertNATS JetStream deduplication:
PostgreSQL side:
max_slot_wal_keep_size=10GB prevents unbounded growthNATS JetStream side:
.storage=file) survives restartsConsumer side:
Snapshot consistency:
CDC event ordering:
Shutdown sequence:
Guarantees:
/metrics, /health, /status, /shutdownsnapshot.request.>, generates snapshots on-demandProducer-Consumer pattern:
Why lock-free?
Dual purpose:
How it handles NATS outages:
NATS goes down at T=0
├─ Main thread continues reading WAL → pushes to queue
├─ Flush thread can't publish → queue fills up
├─ Queue fills (32768 slots) → ~546ms buffer at 60K events/s
├─ Queue full → Main thread backs off (sleeps 1ms per attempt)
├─ PostgreSQL WAL starts accumulating (controlled)
│
NATS reconnects at T=1000ms+ (reconnect_wait, queue covers 54% of retry)
├─ Flush thread resumes publishing
├─ Queue drains rapidly (~546ms of buffered events)
└─ Bridge catches up, resumes ACK'ing PostgreSQL
Backpressure cascade:
NATS outage → Queue fills → Main thread slows → PostgreSQL WAL accumulates
↓
(up to max_slot_wal_keep_size=10GB)
Queue size trade-offs:
| Queue Size | Buffer Duration (at 60K events/s) |
Memory Impact | Pros | Cons |
|---|---|---|---|---|
| 4096 | ~68ms | ~256KB | Minimal memory | Too small for reconnections |
| 16384 | ~273ms | ~1MB | Good balance | Marginal for multi-second outages |
| 32768 (current) | ~546ms | ~2MB | Covers most outages | None (negligible cost) |
Why 32768 is the sweet spot:
Graceful degradation:
max_slot_wal_keep_size=10GB absorbs minute-scale outagesPostgreSQL WAL
↓
Main Thread (parse pgoutput)
↓
SPSC Queue (lock-free)
↓
Batch Publisher Thread
↓ (batch: 500 events OR 100ms OR 256KB)
MessagePack Encoding
↓
NATS JetStream (async publish)
↓ (JetStream ACK)
PostgreSQL LSN ACK
Arena allocator:
Ownership transfer:
On startup:
During operation:
On shutdown:
PostgreSQL reconnection:
NATS reconnection:
| This Bridge | Debezium | |
|---|---|---|
| Maturity | ⚠️ Experimental | ✅ Battle-tested (years in production) |
| Footprint | 15MB / 10MB RAM | 500MB+ / 512MB+ RAM |
| Architecture | NATS-native | Kafka-centric |
| Deployment | Single binary | Kafka Connect cluster |
| Throughput | ~60K events/s (single-threaded) | High (multi-threaded) |
| Connectors | PostgreSQL → NATS only | 100+ sources/sinks |
| Enterprise Support | ❌ None | ✅ Available |
When to use Debezium instead:
When to try this bridge:
Benthos (Redpanda):
pgstream (Xata):
This bridge:
This bridge might carve out a niche for NATS-first teams who want turnkey CDC without heavy infrastructure. Or it might not—time will tell. 🤷
If you're betting on mission-critical CDC, use Debezium. If you're exploring NATS and want a lightweight CDC solution, this is worth trying.
# Build nats.c v3.12.0
./build_nats.sh
# Build libpq for PostgreSQL 18.1
./build_libpq.sh
This compiles:
nats.c → libs/nats-install/libpq → libs/libpq-install/# Start PostgreSQL with logical replication enabled
docker compose up -d postgres
# Start NATS server with JetStream enabled
docker compose up -d nats-server nats-config-gen nats-init
zig build
Output: ./zig-out/bin/bridge
zig build test
All configuration constants are centralized in src/config.zig.
Snapshot configuration:
10_000 rows per batchinit.snap.{table}.{snapshot_id}.{chunk}init.meta.{table}snapshot.request.{table}CDC configuration:
500 events OR 100ms OR 256KB (whichever first)cdc.{table}.{operation}{lsn}-{table}-{operation}NATS configuration:
-1 (infinite)2000ms10_000ms (10 seconds)1 second OR 1MB dataWAL monitoring:
30 seconds512MB1GBBuffer sizes:
32768 slots (2^15, ~546ms buffer at 60K events/s)128 bytes128 bytesSee src/config.zig for all tunables.
├── src/
│ ├── bridge.zig # Main application entry point
│ ├── config.zig # Centralized configuration
│ ├── wal_stream.zig # PostgreSQL replication stream
│ ├── pgoutput.zig # pgoutput format parser
│ ├── nats_publisher.zig # NATS JetStream publisher
│ ├── nats_kv.zig # NATS KV store wrapper
│ ├── batch_publisher.zig # Synchronous batch publisher
│ ├── async_batch_publisher.zig # Async batch publisher with SPSC queue
│ ├── spsc_queue.zig # Lock-free single-producer single-consumer queue
│ ├── schema_publisher.zig # Schema publishing to KV store
│ ├── snapshot_listener.zig # Snapshot request listener
│ ├── wal_monitor.zig # WAL lag monitoring
│ ├── http_server.zig # HTTP telemetry server
│ ├── metrics.zig # Metrics tracking
│ └── pg_conn.zig # PostgreSQL connection helpers
├── libs/
│ ├── nats.c/ # Vendored nats.c v3.12.0 source
│ ├── nats-install/ # Built nats.c library
│ └── libpq-install/ # Built libpq library
├── consumer/ # Elixir consumer example
│ └── lib/consumer/
│ ├── application.ex # Consumer app setup
│ ├── cdc_consumer.ex # CDC stream consumer
│ └── init_consumer.ex # INIT stream consumer (bootstrap)
├── build.zig # Zig build configuration
├── build.zig.zon # Package dependencies
├── docker-compose.yml # Base infrastructure setup
├── docker-compose.prod.yml # Production setup with bridge
├── init.sh # PostgreSQL initialization script
└── nats-server.conf.template # NATS server configuration
Managed via build.zig.zon:
Vendored:
Terminal 1 - Start the bridge:
./zig-out/bin/bridge --stream CDC,INIT
Terminal 2 - Generate CDC events:
cd consumer && iex -S mix
# Generate 100 INSERT events
iex> Producer.run_test(100)
# Parallel load test: 100 batches of 10 events each
iex> Stream.interval(500) |> Stream.take(100) |> Task.async_stream(fn _ -> Producer.run_test(10) end) |> Enum.to_list()
# Health check
curl http://localhost:9090/health
# Bridge status
curl http://localhost:9090/status | jq
# Prometheus metrics
curl http://localhost:9090/metrics
# Stream management
curl "http://localhost:9090/streams/info?stream=CDC" | jq
# Graceful shutdown
curl -X POST http://localhost:9090/shutdown
docker exec -it postgres psql -U postgres -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots
WHERE slot_name = 'cdc_slot';
"
Planned enhancements:
--format json flag for browser-friendly encoding
Schema change notifications (publish to NATS on relation_id change)
Compression options (--compress gzip|lz4|zstd)
Multiple publication support (--publication pub1,pub2)
Metrics export to StatsD/InfluxDB
Open questions:
Contributions welcome! This is a learning project as much as a tool. If you find it useful (or find gaps), feedback is valuable.
This project uses:
wal_level=logical-std=c11Kill zombie processes:
ps aux | grep bridge | grep -v grep | awk '{print $2}' | xargs kill