Change Data Capture & Subscriptions
The SochDB v2 server exposes a Change Data Capture (CDC) stream so clients
can react to row-level changes as they are committed. Changes flow out of the
storage engine's write-ahead log into an in-memory ring buffer, and the gRPC
SubscriptionService turns that buffer into a server-side stream of change
events with table, operation, and resume-by-sequence support.
This page targets core engine 2.0.3 (the sochdb-grpc-server binary). The
gRPC SubscriptionService is defined in the server's proto package
sochdb.v1. Generated stubs ship with the language SDKs (for example the Python
SDK 0.5.9). The SDKs version independently (Python 0.5.9, Node.js 0.5.3, Go
0.4.5).
The core engine (the Rust workspace, the sochdb crate, and the gRPC
server) is AGPL-3.0-or-later, with commercial licensing available. The
language SDKs (Python, Node.js, Go) are Apache-2.0.
How CDC works
CDC is WAL-derived: every committed write is also recorded as a change event in a log-structured ring buffer held in memory. The ring buffer has a fixed capacity, and the server starts it enabled by default.
| Property | Value |
|---|---|
| Buffer type | In-memory log-structured ring buffer |
| Capacity | 65536 events (CdcConfig::capacity) |
| Enabled by default | Yes (CdcConfig::enabled = true) |
| Sequence numbers | Monotonic, start at 1, independent of WAL LSNs |
| Eviction policy | Oldest event dropped on overflow |
Change events are produced from the commit path: the engine buffers insert/update/delete/schema-change records during a transaction and flushes them to the ring buffer after the transaction commits (records are discarded on abort).
The change event
Every event carries the same envelope. In the engine this is CdcEvent; over
gRPC it is delivered as a SubscribeEvent.
| Field | Type | Meaning |
|---|---|---|
sequence | u64 | Monotonic position, starting at 1 |
timestamp_us | u64 | Microseconds since the Unix epoch |
txn_id | u64 | Transaction that produced the event |
table | string | Affected table name |
key | bytes | Primary key (raw bytes) |
operation | enum | Insert, Update, Delete, or SchemaChange |
after_value | bytes | Serialized row image after the change |
before_value | bytes | Reserved — see the after-image note below |
ddl | string | DDL text for SchemaChange events |
The four operations map to a tagged enum in the engine:
Insert { after }Update { before, after }Delete { before }SchemaChange { ddl }
The current implementation emits the after-image only. The before /
before_value field is part of the schema but is always empty today, even
for Update and Delete events. Build consumers that key off the primary
key plus after_value; do not rely on receiving a before-image.
Overrun: when a subscriber falls behind
The ring buffer drops the oldest events once it is full. If a client asks to
resume from a sequence that has already been evicted, the engine returns an
Overrun error reporting both the requested position and the oldest
sequence still available. Slow-subscriber WAL replay (re-reading evicted
positions from the WAL) is not yet implemented, so a consumer that lags by
more than the buffer capacity will miss events and must re-snapshot.
Keep one subscriber per logical consumer, process events promptly, and persist
the last sequence you handled so you can resume. The 65536-event capacity is
generous for steady-state load, but a consumer that stalls for a long time while
the database keeps writing can still be evicted.
The gRPC SubscriptionService
The subscription API lives in the proto package sochdb.v1 as
SubscriptionService. It exposes four RPCs:
| RPC | Kind | Purpose |
|---|---|---|
Subscribe | server-streaming | Stream CDC events with optional filters |
WatchKey | server-streaming | Stream changes for a single key |
ListSubscriptions | unary | List active subscriptions in a namespace |
CancelSubscription | unary | Cancel a subscription by id |
service SubscriptionService {
rpc Subscribe(SubscribeRequest) returns (stream SubscribeEvent);
rpc WatchKey(WatchKeyRequest) returns (stream WatchKeyEvent);
rpc ListSubscriptions(ListSubscriptionsRequest) returns (ListSubscriptionsResponse);
rpc CancelSubscription(CancelSubscriptionRequest) returns (CancelSubscriptionResponse);
}
SubscriptionService is registered behind the server's auth interceptor, so when
the server runs with --auth you must present credentials (a Bearer token via
the authorization header, or an x-api-key header) on the streaming call.
SubscribeRequest fields
message SubscribeRequest {
string namespace = 1; // multi-tenant isolation
repeated string tables = 2; // filter to these tables
repeated OperationType operations = 3; // filter to these operations
uint64 start_sequence = 4; // 0 = from latest, >0 = resume
string where_predicate = 5; // SQL WHERE (see note below)
uint32 batch_size = 6; // 0 = unbounded; server default 64
}
enum OperationType {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_UPDATE = 2;
OPERATION_DELETE = 3;
OPERATION_SCHEMA_CHANGE = 4;
}
| Field | Default behavior | Notes |
|---|---|---|
namespace | — | Tenant/namespace to subscribe within |
tables | empty = all tables | Server-side table filter |
operations | empty = all operations | Server-side operation-type filter |
start_sequence | 0 = from latest | >0 resumes from that position |
where_predicate | empty | Accepted but not yet enforced |
batch_size | 0 = unbounded | Server default batch size is 64 |
Table filtering and operation-type filtering are enforced by the server's
streaming loop. The where_predicate field is part of the proto and is
accepted, but the Subscribe handler does not evaluate it yet — SQL
WHERE / row-level filtering is not wired into the stream. Treat
where_predicate as forward-compatible: setting it is harmless but it will not
narrow the stream. Filter on tables and operations, and apply any row-level
predicate in your own consumer for now.
Resuming with start_sequence
Each event carries a monotonic sequence. To build an at-least-once consumer,
persist the last sequence you successfully processed and pass
start_sequence = last_processed + 1 on reconnect:
start_sequence = 0— start from the latest event (only new changes).start_sequence = N(N greater than 0) — resume from positionN.
If N has already been evicted from the ring buffer, the stream fails with an
Overrun error; treat that as a signal to re-snapshot your state and resubscribe
from latest. Subscription ids are assigned by the server in the form sub-<n>;
you can list them with ListSubscriptions and stop one with
CancelSubscription (or simply drop the stream).
Client example
The example below subscribes to inserts and updates on two tables in the
default namespace and prints each change as it arrives, tracking the last
sequence so it could resume after a disconnect.
- Python
- grpcurl
The Python SDK 0.5.9 ships the generated gRPC stubs. Import them from
sochdb.proto and call SubscriptionService directly over a channel:
import grpc
from sochdb.proto import sochdb_pb2, sochdb_pb2_grpc
# Connect to the gRPC server (default port 50051).
channel = grpc.insecure_channel("127.0.0.1:50051")
stub = sochdb_pb2_grpc.SubscriptionServiceStub(channel)
request = sochdb_pb2.SubscribeRequest(
namespace="default",
tables=["users", "orders"], # enforced: only these tables
operations=[ # enforced: only these operations
sochdb_pb2.OPERATION_INSERT,
sochdb_pb2.OPERATION_UPDATE,
],
start_sequence=0, # 0 = from latest; use N>0 to resume
batch_size=64, # 0 = unbounded
# where_predicate="age > 21", # accepted but NOT yet enforced
)
# If the server runs with --auth, attach credentials:
# metadata = [("authorization", "Bearer <jwt-or-api-key>")]
# for event in stub.Subscribe(request, metadata=metadata):
last_sequence = 0
op_names = {
sochdb_pb2.OPERATION_INSERT: "INSERT",
sochdb_pb2.OPERATION_UPDATE: "UPDATE",
sochdb_pb2.OPERATION_DELETE: "DELETE",
sochdb_pb2.OPERATION_SCHEMA_CHANGE: "SCHEMA_CHANGE",
}
try:
for event in stub.Subscribe(request):
last_sequence = event.sequence
print(
f"#{event.sequence} {op_names.get(event.operation, '?')} "
f"{event.table} key={event.key!r} "
f"after={len(event.after_value)} bytes"
)
# Persist last_sequence here so you can resume from it on reconnect:
# SubscribeRequest(start_sequence=last_sequence + 1, ...)
except grpc.RpcError as err:
# An Overrun (the requested position was evicted) surfaces here; re-snapshot
# and resubscribe from latest.
print(f"stream ended: {err.code()} {err.details()}")
event.before_value is always empty in 2.0.3 (after-image only). Reconstruct
prior state from your own copy if you need it.
You can exercise the stream from the shell with grpcurl:
grpcurl -plaintext \
-d '{
"namespace": "default",
"tables": ["users", "orders"],
"operations": ["OPERATION_INSERT", "OPERATION_UPDATE"],
"start_sequence": 0,
"batch_size": 64
}' \
127.0.0.1:50051 \
sochdb.v1.SubscriptionService/Subscribe
With --auth enabled, add a header:
grpcurl -plaintext \
-H 'authorization: Bearer <jwt-or-api-key>' \
-d '{ "namespace": "default", "start_sequence": 0 }' \
127.0.0.1:50051 \
sochdb.v1.SubscriptionService/Subscribe
Watching a single key
When you only care about one row, WatchKey streams WatchKeyEvents for a
single (namespace, table, key) triple rather than the whole table:
message WatchKeyRequest {
string namespace = 1;
string table = 2;
bytes key = 3;
}
message WatchKeyEvent {
uint64 sequence = 1;
uint64 timestamp_us = 2;
OperationType operation = 3;
bytes after_value = 4;
bytes before_value = 5; // empty today (after-image only)
}
Listing and cancelling subscriptions
# List active subscriptions in a namespace.
listing = stub.ListSubscriptions(
sochdb_pb2.ListSubscriptionsRequest(namespace="default")
)
for info in listing.subscriptions:
print(info.subscription_id, info.tables, "at", info.current_sequence)
# Cancel one explicitly (or just stop iterating / close the channel).
stub.CancelSubscription(
sochdb_pb2.CancelSubscriptionRequest(subscription_id="sub-1")
)
WebSocket subscriptions (not wired in the default binary)
The server's WebSocket gateway (--ws-port, default 8080) defines a
subscribe message type in its JSON protocol, with a payload of
{ tables, operations, start_sequence }. However, in the default
sochdb-grpc-server binary the gateway is constructed without a CDC log
handle, so WebSocket subscribe requests are not connected to CDC.
The subscribe message exists in the WebSocket protocol, but the default server
binary passes no CDC log to the gateway, so WebSocket subscriptions do not
deliver change events. Use the gRPC SubscriptionService for CDC today; the
WebSocket gateway is usable for sql, kv_get, kv_put, kv_delete, and
ping.
Operational notes
- Ports. The gRPC
SubscriptionServiceis served on the gRPC port (--port, default 50051), alongside every other gRPC service. - Auth. With
--auth, streaming RPCs require anauthorization: Bearertoken or anx-api-keyheader, same as the rest of the gRPC surface. Without--auth, requests resolve to an anonymous principal. - Durability. CDC is an in-memory ring buffer, not a durable log. It is for
near-real-time fan-out, not as a system of record. Persist the last
sequenceyou handled and re-snapshot onOverrun. - Before-images. Not available in 2.0.3 (after-image only).
See also
- Deploying to Production — server binary, flags, ports, and operational defaults.