Skip to main content

Change Data Capture & Subscriptions

The SochDB v2 server exposes a Change Data Capture (CDC) stream so clients can react to row-level changes as they are committed. Changes flow out of the storage engine's write-ahead log into an in-memory ring buffer, and the gRPC SubscriptionService turns that buffer into a server-side stream of change events with table, operation, and resume-by-sequence support.

Versions

This page targets core engine 2.0.3 (the sochdb-grpc-server binary). The gRPC SubscriptionService is defined in the server's proto package sochdb.v1. Generated stubs ship with the language SDKs (for example the Python SDK 0.5.9). The SDKs version independently (Python 0.5.9, Node.js 0.5.3, Go 0.4.5).

License

The core engine (the Rust workspace, the sochdb crate, and the gRPC server) is AGPL-3.0-or-later, with commercial licensing available. The language SDKs (Python, Node.js, Go) are Apache-2.0.


How CDC works

CDC is WAL-derived: every committed write is also recorded as a change event in a log-structured ring buffer held in memory. The ring buffer has a fixed capacity, and the server starts it enabled by default.

PropertyValue
Buffer typeIn-memory log-structured ring buffer
Capacity65536 events (CdcConfig::capacity)
Enabled by defaultYes (CdcConfig::enabled = true)
Sequence numbersMonotonic, start at 1, independent of WAL LSNs
Eviction policyOldest event dropped on overflow

Change events are produced from the commit path: the engine buffers insert/update/delete/schema-change records during a transaction and flushes them to the ring buffer after the transaction commits (records are discarded on abort).

The change event

Every event carries the same envelope. In the engine this is CdcEvent; over gRPC it is delivered as a SubscribeEvent.

FieldTypeMeaning
sequenceu64Monotonic position, starting at 1
timestamp_usu64Microseconds since the Unix epoch
txn_idu64Transaction that produced the event
tablestringAffected table name
keybytesPrimary key (raw bytes)
operationenumInsert, Update, Delete, or SchemaChange
after_valuebytesSerialized row image after the change
before_valuebytesReserved — see the after-image note below
ddlstringDDL text for SchemaChange events

The four operations map to a tagged enum in the engine:

  • Insert { after }
  • Update { before, after }
  • Delete { before }
  • SchemaChange { ddl }
After-image only

The current implementation emits the after-image only. The before / before_value field is part of the schema but is always empty today, even for Update and Delete events. Build consumers that key off the primary key plus after_value; do not rely on receiving a before-image.

Overrun: when a subscriber falls behind

The ring buffer drops the oldest events once it is full. If a client asks to resume from a sequence that has already been evicted, the engine returns an Overrun error reporting both the requested position and the oldest sequence still available. Slow-subscriber WAL replay (re-reading evicted positions from the WAL) is not yet implemented, so a consumer that lags by more than the buffer capacity will miss events and must re-snapshot.

Avoid overruns

Keep one subscriber per logical consumer, process events promptly, and persist the last sequence you handled so you can resume. The 65536-event capacity is generous for steady-state load, but a consumer that stalls for a long time while the database keeps writing can still be evicted.


The gRPC SubscriptionService

The subscription API lives in the proto package sochdb.v1 as SubscriptionService. It exposes four RPCs:

RPCKindPurpose
Subscribeserver-streamingStream CDC events with optional filters
WatchKeyserver-streamingStream changes for a single key
ListSubscriptionsunaryList active subscriptions in a namespace
CancelSubscriptionunaryCancel a subscription by id
sochdb.proto (sochdb.v1)
service SubscriptionService {
rpc Subscribe(SubscribeRequest) returns (stream SubscribeEvent);
rpc WatchKey(WatchKeyRequest) returns (stream WatchKeyEvent);
rpc ListSubscriptions(ListSubscriptionsRequest) returns (ListSubscriptionsResponse);
rpc CancelSubscription(CancelSubscriptionRequest) returns (CancelSubscriptionResponse);
}

SubscriptionService is registered behind the server's auth interceptor, so when the server runs with --auth you must present credentials (a Bearer token via the authorization header, or an x-api-key header) on the streaming call.

SubscribeRequest fields

SubscribeRequest
message SubscribeRequest {
string namespace = 1; // multi-tenant isolation
repeated string tables = 2; // filter to these tables
repeated OperationType operations = 3; // filter to these operations
uint64 start_sequence = 4; // 0 = from latest, >0 = resume
string where_predicate = 5; // SQL WHERE (see note below)
uint32 batch_size = 6; // 0 = unbounded; server default 64
}

enum OperationType {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_UPDATE = 2;
OPERATION_DELETE = 3;
OPERATION_SCHEMA_CHANGE = 4;
}
FieldDefault behaviorNotes
namespaceTenant/namespace to subscribe within
tablesempty = all tablesServer-side table filter
operationsempty = all operationsServer-side operation-type filter
start_sequence0 = from latest>0 resumes from that position
where_predicateemptyAccepted but not yet enforced
batch_size0 = unboundedServer default batch size is 64
Filtering: what is enforced today

Table filtering and operation-type filtering are enforced by the server's streaming loop. The where_predicate field is part of the proto and is accepted, but the Subscribe handler does not evaluate it yet — SQL WHERE / row-level filtering is not wired into the stream. Treat where_predicate as forward-compatible: setting it is harmless but it will not narrow the stream. Filter on tables and operations, and apply any row-level predicate in your own consumer for now.

Resuming with start_sequence

Each event carries a monotonic sequence. To build an at-least-once consumer, persist the last sequence you successfully processed and pass start_sequence = last_processed + 1 on reconnect:

  • start_sequence = 0 — start from the latest event (only new changes).
  • start_sequence = N (N greater than 0) — resume from position N.

If N has already been evicted from the ring buffer, the stream fails with an Overrun error; treat that as a signal to re-snapshot your state and resubscribe from latest. Subscription ids are assigned by the server in the form sub-<n>; you can list them with ListSubscriptions and stop one with CancelSubscription (or simply drop the stream).


Client example

The example below subscribes to inserts and updates on two tables in the default namespace and prints each change as it arrives, tracking the last sequence so it could resume after a disconnect.

The Python SDK 0.5.9 ships the generated gRPC stubs. Import them from sochdb.proto and call SubscriptionService directly over a channel:

import grpc
from sochdb.proto import sochdb_pb2, sochdb_pb2_grpc

# Connect to the gRPC server (default port 50051).
channel = grpc.insecure_channel("127.0.0.1:50051")
stub = sochdb_pb2_grpc.SubscriptionServiceStub(channel)

request = sochdb_pb2.SubscribeRequest(
namespace="default",
tables=["users", "orders"], # enforced: only these tables
operations=[ # enforced: only these operations
sochdb_pb2.OPERATION_INSERT,
sochdb_pb2.OPERATION_UPDATE,
],
start_sequence=0, # 0 = from latest; use N>0 to resume
batch_size=64, # 0 = unbounded
# where_predicate="age > 21", # accepted but NOT yet enforced
)

# If the server runs with --auth, attach credentials:
# metadata = [("authorization", "Bearer <jwt-or-api-key>")]
# for event in stub.Subscribe(request, metadata=metadata):

last_sequence = 0
op_names = {
sochdb_pb2.OPERATION_INSERT: "INSERT",
sochdb_pb2.OPERATION_UPDATE: "UPDATE",
sochdb_pb2.OPERATION_DELETE: "DELETE",
sochdb_pb2.OPERATION_SCHEMA_CHANGE: "SCHEMA_CHANGE",
}

try:
for event in stub.Subscribe(request):
last_sequence = event.sequence
print(
f"#{event.sequence} {op_names.get(event.operation, '?')} "
f"{event.table} key={event.key!r} "
f"after={len(event.after_value)} bytes"
)
# Persist last_sequence here so you can resume from it on reconnect:
# SubscribeRequest(start_sequence=last_sequence + 1, ...)
except grpc.RpcError as err:
# An Overrun (the requested position was evicted) surfaces here; re-snapshot
# and resubscribe from latest.
print(f"stream ended: {err.code()} {err.details()}")
before_value is empty

event.before_value is always empty in 2.0.3 (after-image only). Reconstruct prior state from your own copy if you need it.

Watching a single key

When you only care about one row, WatchKey streams WatchKeyEvents for a single (namespace, table, key) triple rather than the whole table:

WatchKey
message WatchKeyRequest {
string namespace = 1;
string table = 2;
bytes key = 3;
}

message WatchKeyEvent {
uint64 sequence = 1;
uint64 timestamp_us = 2;
OperationType operation = 3;
bytes after_value = 4;
bytes before_value = 5; // empty today (after-image only)
}

Listing and cancelling subscriptions

# List active subscriptions in a namespace.
listing = stub.ListSubscriptions(
sochdb_pb2.ListSubscriptionsRequest(namespace="default")
)
for info in listing.subscriptions:
print(info.subscription_id, info.tables, "at", info.current_sequence)

# Cancel one explicitly (or just stop iterating / close the channel).
stub.CancelSubscription(
sochdb_pb2.CancelSubscriptionRequest(subscription_id="sub-1")
)

WebSocket subscriptions (not wired in the default binary)

The server's WebSocket gateway (--ws-port, default 8080) defines a subscribe message type in its JSON protocol, with a payload of { tables, operations, start_sequence }. However, in the default sochdb-grpc-server binary the gateway is constructed without a CDC log handle, so WebSocket subscribe requests are not connected to CDC.

WebSocket CDC is not enabled by default

The subscribe message exists in the WebSocket protocol, but the default server binary passes no CDC log to the gateway, so WebSocket subscriptions do not deliver change events. Use the gRPC SubscriptionService for CDC today; the WebSocket gateway is usable for sql, kv_get, kv_put, kv_delete, and ping.


Operational notes

  • Ports. The gRPC SubscriptionService is served on the gRPC port (--port, default 50051), alongside every other gRPC service.
  • Auth. With --auth, streaming RPCs require an authorization: Bearer token or an x-api-key header, same as the rest of the gRPC surface. Without --auth, requests resolve to an anonymous principal.
  • Durability. CDC is an in-memory ring buffer, not a durable log. It is for near-real-time fan-out, not as a system of record. Persist the last sequence you handled and re-snapshot on Overrun.
  • Before-images. Not available in 2.0.3 (after-image only).

See also