Skip to main content
Version: Next

Python SDK Guide

Version: 0.4.7
Time: 45 minutes
Difficulty: Beginner to Intermediate
Prerequisites: Python 3.9+

Complete guide to SochDB's Python SDK covering dual-mode architecture (embedded FFI + server gRPC), namespaces, collections, vector search, priority queues, and advanced features.


Table of Contents

  1. Installation
  2. Quick Start
  3. Architecture: Dual-Mode
  4. Namespace & Collections
  5. Vector Search
  6. Priority Queue
  7. SQL Database
  8. Key-Value Operations
  9. Path API
  10. Prefix Scanning
  11. Transactions
  12. Graph Overlay
  13. Temporal Graph
  14. Server Mode (gRPC/IPC)
  15. CLI Tools
  16. Advanced Features
  17. Error Handling
  18. Best Practices
  19. Complete Examples

Installation

pip install sochdb

What's New in 0.4.7:

  • ✅ Improved FFI stability and error messages
  • ✅ Better platform detection for native libraries

What's New in 0.4.6:

  • ✅ Temporal Graph API for time-aware relationships
  • ✅ Enhanced Graph Overlay with BFS/DFS traversal

What's New in 0.4.3:

  • ✅ Priority Queue API with ordered-key task entries
  • ✅ Streaming TopK for efficient ORDER BY + LIMIT
  • ✅ Backend-agnostic queue (FFI, gRPC, In-Memory)

What's New in 0.4.1:

  • ✅ Namespace API for multi-tenant isolation
  • ✅ Collection API with auto-dimension vectors
  • ✅ Lock error types (DatabaseLockedError, LockTimeoutError)
  • ✅ Concurrent mode support

What's New in 0.4.0:

  • ✅ Project rename: ToonDB → SochDB
  • ✅ Dual-mode architecture: Embedded (FFI) + Server (gRPC)
  • ✅ VectorIndex class with native HNSW

Import Note: Install with pip install sochdb, import as from sochdb import Database

Pre-built for:

  • Linux (x86_64, aarch64)
  • macOS (Intel, Apple Silicon)
  • Windows (x64)

Quick Start

Embedded Mode

from sochdb import Database

# Open database
with Database.open("./my_database") as db:
# Put and Get
db.put(b"user:123", b'{"name":"Alice","age":30}')
value = db.get(b"user:123")
print(value.decode())
# Output: {"name":"Alice","age":30}

Output:

{"name":"Alice","age":30}

Architecture: Dual-Mode

SochDB Python SDK supports two deployment modes:

Direct FFI bindings to Rust libraries. No server required.

from sochdb import Database

# Direct FFI - no server needed
with Database.open("./mydb") as db:
db.put(b"key", b"value")
value = db.get(b"key")

Best for: Local development, notebooks, simple apps, edge deployments.

Server Mode (gRPC) - For Distributed Systems

Thin client connecting to sochdb-grpc server.

from sochdb import SochDBClient

# Connect to server
client = SochDBClient("localhost:50051")
client.put_kv("namespace", "key", b"value")
value = client.get_kv("namespace", "key")
client.close()

Best for: Production, multi-language environments, microservices.

Concurrent Mode (v0.4.1+)

Multi-process access to the same database with MVCC.

from sochdb import Database

# Multiple processes can access simultaneously
db = Database.open_concurrent("./shared_db")
print(f"Concurrent mode: {db.is_concurrent}") # True

Namespace & Collections

New in v0.4.1 — Type-safe multi-tenant isolation with vector collections.

Creating Namespaces

from sochdb import Database, NamespaceConfig

with Database.open("./multi_tenant") as db:
# Create namespace for tenant
config = NamespaceConfig(
name="tenant_123",
display_name="Acme Corp",
labels={"tier": "enterprise"},
)
ns = db.create_namespace(config)

# Or get existing
ns = db.namespace("tenant_123")

Creating Collections

from sochdb import CollectionConfig, DistanceMetric

# Create vector collection
config = CollectionConfig(
name="documents",
dimension=384, # None = auto-infer from first vector
metric=DistanceMetric.COSINE,
m=16,
ef_construction=100,
)
collection = ns.create_collection(config)

# Or simpler
collection = ns.create_collection("embeddings", dimension=768)

Vector Operations

# Insert vectors with metadata
collection.insert(
vector=[0.1, 0.2, 0.3, ...], # 384-dim
metadata={"source": "web", "url": "https://..."},
id="doc_001" # Optional, auto-generated if omitted
)

# Batch insert
collection.insert_batch(
vectors=[[0.1, ...], [0.2, ...], [0.3, ...]],
metadatas=[{"type": "a"}, {"type": "b"}, {"type": "c"}],
ids=["doc_1", "doc_2", "doc_3"]
)

Unified Search API

from sochdb import SearchRequest

# Vector search
results = collection.search(
SearchRequest(
vector=query_embedding,
k=10,
filter={"source": "web"},
)
)

for result in results:
print(f"ID: {result.id}, Score: {result.score:.4f}")

# Keyword search (if hybrid enabled)
results = collection.search(
SearchRequest(
text_query="machine learning",
k=10,
)
)

# Hybrid search (RRF fusion)
results = collection.search(
SearchRequest(
vector=query_embedding,
text_query="ML algorithms",
k=10,
alpha=0.7, # 0.7 vector + 0.3 keyword
)
)

Convenience Methods

# Quick vector search
results = collection.vector_search(query_embedding, k=10)

# Quick keyword search
results = collection.keyword_search("neural networks", k=10)

# Quick hybrid search
results = collection.hybrid_search(query_embedding, "deep learning", k=10)

Native VectorIndex (HNSW)

from sochdb import VectorIndex

# Create index
index = VectorIndex(
dimension=768,
metric="cosine", # cosine, euclidean, dot_product
m=16,
ef_construction=100,
)

# Insert vectors
import numpy as np
embeddings = np.random.randn(10000, 768).astype(np.float32)

for i, vec in enumerate(embeddings):
index.insert(f"doc_{i}", vec)

# Or batch insert (faster)
ids = [f"doc_{i}" for i in range(len(embeddings))]
index.insert_batch(ids, embeddings) # ~15,000 vec/s with FFI

# Search
query = np.random.randn(768).astype(np.float32)
results = index.search(query, k=10, ef_search=64)

for id, distance in results:
print(f"{id}: {distance:.4f}")

Bulk Operations (Legacy)

from sochdb.bulk import bulk_build_index, bulk_query_index

# Build HNSW index
stats = bulk_build_index(
embeddings,
output="my_index.hnsw",
m=16,
ef_construction=100,
metric="cosine"
)
print(f"Built {stats.vectors} vectors at {stats.rate:.0f} vec/s")

# Query
results = bulk_query_index(
index="my_index.hnsw",
query=query,
k=10,
ef_search=64
)

Priority Queue

New in v0.4.3 — First-class queue API with ordered-key task entries.

Creating Queues

from sochdb import Database
from sochdb.queue import PriorityQueue, QueueConfig

db = Database.open("./queue_db")

# Create queue with config
config = QueueConfig(
visibility_timeout_ms=30000, # 30s lease
max_attempts=3,
dead_letter_queue="failed_tasks",
)
queue = PriorityQueue.from_database(db, "tasks", config)

Enqueue Tasks

# Enqueue with priority (lower = higher priority)
task_id = queue.enqueue(
priority=1, # High priority
payload=b'{"action": "process_order", "order_id": 123}',
metadata={"source": "api"},
)
print(f"Enqueued task: {task_id}")

# Delayed task
import time
queue.enqueue(
priority=5,
payload=b"delayed task",
delay_ms=60000, # Visible in 1 minute
)

Dequeue and Process

# Dequeue (claims task with lease)
task = queue.dequeue(worker_id="worker-1")

if task:
try:
# Process task
payload = task.payload
print(f"Processing: {payload}")

# Acknowledge completion
queue.ack(task.task_id)
except Exception as e:
# Negative ack (retry or dead-letter)
queue.nack(task.task_id)

Queue Statistics

stats = queue.stats()
print(f"Pending: {stats.pending}")
print(f"In-flight: {stats.claimed}")
print(f"Dead-lettered: {stats.dead_lettered}")

Streaming TopK

For efficient ORDER BY + LIMIT queries:

from sochdb.queue import StreamingTopK

# Get top 100 highest priority tasks
top_tasks = queue.top_k(k=100)
for task in top_tasks:
print(f"Priority {task.priority}: {task.task_id}")

SQL Database

CREATE TABLE

from sochdb import Database

with Database.open("./sql_db") as db:
# Create table
db.execute_sql("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER
)
""")

# Insert data
db.execute_sql("""
INSERT INTO users (id, name, email, age)
VALUES (1, 'Alice', 'alice@example.com', 30)
""")

db.execute_sql("""
INSERT INTO users (id, name, email, age)
VALUES (2, 'Bob', 'bob@example.com', 25)
""")

Output:

Table 'users' created
2 rows inserted

SELECT Queries

# Select all
results = db.execute_sql("SELECT * FROM users")
for row in results:
print(row)

# Output:
# {'id': 1, 'name': 'Alice', 'email': 'alice@example.com', 'age': 30}
# {'id': 2, 'name': 'Bob', 'email': 'bob@example.com', 'age': 25}

# WHERE clause
results = db.execute_sql("SELECT name, age FROM users WHERE age > 26")
for row in results:
print(f"{row['name']}: {row['age']} years old")

# Output:
# Alice: 30 years old

JOIN Queries

# Create orders table
db.execute_sql("""
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
user_id INTEGER,
product TEXT,
amount REAL,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")

# Insert orders
db.execute_sql("INSERT INTO orders VALUES (1, 1, 'Laptop', 999.99)")
db.execute_sql("INSERT INTO orders VALUES (2, 1, 'Mouse', 25.00)")
db.execute_sql("INSERT INTO orders VALUES (3, 2, 'Keyboard', 75.00)")

# JOIN query
results = db.execute_sql("""
SELECT users.name, orders.product, orders.amount
FROM users
JOIN orders ON users.id = orders.user_id
WHERE orders.amount > 50
ORDER BY orders.amount DESC
""")

for row in results:
print(f"{row['name']} bought {row['product']} for ${row['amount']}")

Output:

Alice bought Laptop for $999.99
Bob bought Keyboard for $75.0

Aggregations

# GROUP BY with aggregations
results = db.execute_sql("""
SELECT users.name, COUNT(*) as order_count, SUM(orders.amount) as total
FROM users
JOIN orders ON users.id = orders.user_id
GROUP BY users.name
ORDER BY total DESC
""")

for row in results:
print(f"{row['name']}: {row['order_count']} orders, ${row['total']} total")

Output:

Alice: 2 orders, $1024.99 total
Bob: 1 orders, $75.0 total

UPDATE and DELETE

# Update
db.execute_sql("UPDATE users SET age = 31 WHERE name = 'Alice'")

# Delete
db.execute_sql("DELETE FROM users WHERE age < 26")

# Verify
results = db.execute_sql("SELECT name, age FROM users")
for row in results:
print(row)

# Output:
# {'name': 'Alice', 'age': 31}

Key-Value Operations

Basic Operations

# Put
db.put(b"key", b"value")

# Get
value = db.get(b"key")
if value:
print(value.decode())
else:
print("Key not found")

# Delete
db.delete(b"key")

# Output:
# value
# Key not found (after delete)

JSON Data

import json

# Store JSON
user = {"name": "Alice", "email": "alice@example.com", "age": 30}
db.put(b"users/alice", json.dumps(user).encode())

# Retrieve JSON
value = db.get(b"users/alice")
if value:
user = json.loads(value.decode())
print(f"Name: {user['name']}, Age: {user['age']}")

# Output:
# Name: Alice, Age: 30

Path API

# Store hierarchical data
db.put_path("users/alice/email", b"alice@example.com")
db.put_path("users/alice/age", b"30")
db.put_path("users/alice/settings/theme", b"dark")

# Retrieve by path
email = db.get_path("users/alice/email")
print(f"Alice's email: {email.decode()}")

# Output:
# Alice's email: alice@example.com

Prefix Scanning

Most efficient way to iterate keys:

# Insert multi-tenant data
db.put(b"tenants/acme/users/1", b'{"name":"Alice"}')
db.put(b"tenants/acme/users/2", b'{"name":"Bob"}')
db.put(b"tenants/acme/orders/1", b'{"total":100}')
db.put(b"tenants/globex/users/1", b'{"name":"Charlie"}')

# Scan only ACME Corp data (tenant isolation)
results = list(db.scan(b"tenants/acme/", b"tenants/acme;"))
print(f"ACME Corp has {len(results)} items:")
for key, value in results:
print(f" {key.decode()}: {value.decode()}")

Output:

ACME Corp has 3 items:
tenants/acme/orders/1: {"total":100}
tenants/acme/users/1: {"name":"Alice"}
tenants/acme/users/2: {"name":"Bob"}

Why use scan():

  • Fast: O(|prefix|) performance
  • Isolated: Perfect for multi-tenant apps
  • Efficient: Binary-safe iteration

Transactions

Automatic Transactions

# Context manager handles commit/abort
with db.transaction() as txn:
txn.put(b"account:1:balance", b"1000")
txn.put(b"account:2:balance", b"500")
# Commits on success, aborts on exception

Output:

✅ Transaction committed

Manual Control

txn = db.begin_transaction()
try:
txn.put(b"key1", b"value1")
txn.put(b"key2", b"value2")

# Scan within transaction
for key, value in txn.scan(b"key", b"key~"):
print(f"{key.decode()}: {value.decode()}")

txn.commit()
except Exception as e:
txn.abort()
raise

Output:

key1: value1
key2: value2
✅ Transaction committed

Graph Overlay

New in v0.3.3 — Lightweight graph layer for agent memory relationships.

Creating Nodes and Edges

from sochdb import Database

with Database.open("./agent_memory") as db:
# Create nodes
db.graph_add_node(
namespace="agent_001",
node_id="user_alice",
node_type="User",
properties={"name": "Alice", "role": "admin"}
)

db.graph_add_node(
namespace="agent_001",
node_id="conv_123",
node_type="Conversation",
properties={"topic": "Support request"}
)

# Create edge
db.graph_add_edge(
namespace="agent_001",
from_id="user_alice",
edge_type="STARTED",
to_id="conv_123",
properties={"timestamp": "2026-01-15T10:30:00Z"}
)

Traversal

# BFS traversal from a node
results = db.graph_traverse(
namespace="agent_001",
start_node="user_alice",
max_depth=3,
order="bfs" # or "dfs"
)

for node in results:
print(f"Node: {node['id']} ({node['type']})")

Temporal Graph

New in v0.4.6 — Time-aware relationships for historical queries.

Adding Temporal Edges

from sochdb import Database

with Database.open("./temporal_db") as db:
# Add temporal edge with validity period
db.add_temporal_edge(
namespace="org",
from_id="alice",
edge_type="WORKS_AT",
to_id="acme_corp",
valid_from=1704067200000, # 2024-01-01 (ms)
valid_until=1735689600000, # 2025-01-01 (ms)
properties={"role": "Engineer"}
)

# Add another edge (current)
db.add_temporal_edge(
namespace="org",
from_id="alice",
edge_type="WORKS_AT",
to_id="globex_inc",
valid_from=1735689600000, # 2025-01-01
valid_until=0, # 0 = no end (current)
properties={"role": "Senior Engineer"}
)

Querying at a Point in Time

# Query: "Where did Alice work on 2024-06-15?"
timestamp = 1718409600000 # 2024-06-15 in ms

results = db.query_temporal_graph(
namespace="org",
node="alice",
mode="point_in_time",
timestamp=timestamp,
edge_type="WORKS_AT"
)

for edge in results:
print(f"Worked at: {edge['to_id']} as {edge['properties']['role']}")
# Output: Worked at: acme_corp as Engineer

Querying a Time Range

# Query: "All jobs Alice had in 2024"
results = db.query_temporal_graph(
namespace="org",
node="alice",
mode="range",
start_time=1704067200000, # 2024-01-01
end_time=1735689599999, # 2024-12-31
edge_type="WORKS_AT"
)

for edge in results:
print(f"{edge['to_id']}: {edge['valid_from']} - {edge['valid_until']}")

Query Builder

Returns results in TOON format (token-optimized for LLMs):

# Insert structured data
db.put(b"products/laptop", b'{"name":"Laptop","price":999,"stock":5}')
db.put(b"products/mouse", b'{"name":"Mouse","price":25,"stock":20}')

# Query with column selection
results = db.query("products/") \
.select(["name", "price"]) \
.limit(10) \
.to_list()

for key, value in results:
print(f"{key.decode()}: {value.decode()}")

Output (TOON Format):

products/laptop: result[1]{name,price}:Laptop,999
products/mouse: result[1]{name,price}:Mouse,25

Vector Search

Bulk HNSW Index Building

from sochdb.bulk import bulk_build_index, bulk_query_index
import numpy as np

# Generate embeddings (10K × 768D)
embeddings = np.random.randn(10000, 768).astype(np.float32)

# Build HNSW index at ~1,600 vec/s
stats = bulk_build_index(
embeddings,
output="my_index.hnsw",
m=16,
ef_construction=100,
metric="cosine"
)

print(f"Built {stats.vectors} vectors at {stats.rate:.0f} vec/s")

Output:

Built 10000 vectors at 1598 vec/s
Index size: 45.2 MB

Query HNSW Index

# Single query vector
query = np.random.randn(768).astype(np.float32)

results = bulk_query_index(
index="my_index.hnsw",
query=query,
k=10,
ef_search=64
)

print(f"Top {len(results)} nearest neighbors:")
for i, neighbor in enumerate(results):
print(f"{i+1}. ID: {neighbor.id}, Distance: {neighbor.distance:.4f}")

Output:

Top 10 nearest neighbors:
1. ID: 3421, Distance: 0.1234
2. ID: 7892, Distance: 0.1456
3. ID: 1205, Distance: 0.1678
...

Performance:

  • Python FFI: ~130 vec/s
  • Bulk API: ~1,600 vec/s (12× faster)

Server Mode (gRPC/IPC)

For distributed systems and multi-process applications.

gRPC Client

from sochdb import SochDBClient

# Connect to gRPC server
client = SochDBClient("localhost:50051")

# Key-Value operations
client.put_kv("my_namespace", "user:123", b'{"name": "Alice"}')
value = client.get_kv("my_namespace", "user:123")
print(value.decode())

# Vector search
results = client.vector_search(
namespace="my_namespace",
collection="documents",
query=[0.1, 0.2, 0.3, ...],
k=10
)

# Graph operations
client.add_graph_node(
namespace="agent",
node_id="user_1",
node_type="User",
properties={"name": "Alice"}
)

client.close()

IPC Client (Unix Socket)

For multi-process applications on the same machine:

# Start IPC server
sochdb-server --db ./my_database
from sochdb import IpcClient

client = IpcClient.connect("./my_database/sochdb.sock")

client.put(b"key", b"value")
value = client.get(b"key")
print(value.decode())

client.close()

IPC Mode (Legacy)

For multi-process applications, SochDB provides a high-performance IPC server with Unix domain socket communication.

Deep Dive: See IPC Server Capabilities for wire protocol details, internals, and architecture.

Quick Start

# Start the IPC server (globally available after pip install)
sochdb-server --db ./my_database

# Check status
sochdb-server status --db ./my_database
# Output: [Server] Running (PID: 12345)
# Connect from Python (or any other process)
from sochdb import IpcClient

client = IpcClient.connect("./my_database/sochdb.sock")

client.put(b"key", b"value")
value = client.get(b"key")
print(value.decode())
# Output: value

sochdb-server Options

OptionDefaultDescription
--db PATH./sochdb_dataDatabase directory
--socket PATH<db>/sochdb.sockUnix socket path
--max-clients N100Maximum concurrent connections
--timeout-ms MS30000Connection timeout (30s)
--log-level LEVELinfotrace/debug/info/warn/error

Server Commands

# Start server
sochdb-server --db ./my_database

# Check if running
sochdb-server status --db ./my_database
# Output: [Server] Running (PID: 12345)
# Socket: ./my_database/sochdb.sock
# Database: /absolute/path/to/my_database

# Stop server gracefully
sochdb-server stop --db ./my_database

Production Configuration

# High-traffic production setup
sochdb-server \
--db /var/lib/sochdb/production \
--socket /var/run/sochdb.sock \
--max-clients 500 \
--timeout-ms 60000 \
--log-level info

Wire Protocol

The IPC server uses a binary protocol for high-performance communication. See the Deep Dive for full opcode usage.

Server Statistics

The IPC server tracks real-time metrics accessible via client.stats():

from sochdb import IpcClient

client = IpcClient.connect("./my_database/sochdb.sock")
stats = client.stats()

print(f"Connections: {stats['connections_active']}/{stats['connections_total']}")
print(f"Requests: {stats['requests_success']} success, {stats['requests_error']} errors")
print(f"Throughput: {stats['bytes_received']} bytes in, {stats['bytes_sent']} bytes out")
print(f"Uptime: {stats['uptime_secs']} seconds")
print(f"Active transactions: {stats['active_transactions']}")

CLI Tools

Three CLI tools are available globally after pip install sochdb:

sochdb-bulk

High-performance bulk vector operations (~1,600 vec/s).

Deep Dive: See Bulk Operations Capabilities for benchmarks, file formats, and internals.

# Build HNSW index from embeddings
sochdb-bulk build-index \
--input embeddings.npy \
--output index.hnsw \
--dimension 768 \
--max-connections 16 \
--ef-construction 100 \
--metric cosine

# Query k-nearest neighbors
sochdb-bulk query \
--index index.hnsw \
--query query_vector.raw \
--k 10 \
--ef 64

# Get index metadata
sochdb-bulk info --index index.hnsw
# Output:
# Dimension: 768
# Vectors: 100000
# Max connections: 16

# Convert between formats
sochdb-bulk convert \
--input vectors.npy \
--output vectors.raw \
--to-format raw_f32 \
--dimension 768

sochdb-grpc-server

gRPC server for remote vector search operations.

Deep Dive: See gRPC Server Capabilities for service methods, HNSW configuration, and proto definitions.

# Start gRPC server
sochdb-grpc-server --host 0.0.0.0 --port 50051

# Check status
sochdb-grpc-server status --port 50051

gRPC Service Methods: See gRPC Deep Dive for full method signatures.

Python gRPC Client Example:

import grpc
from sochdb_pb2 import (
CreateIndexRequest, SearchRequest, HnswConfig
)
from sochdb_pb2_grpc import VectorIndexServiceStub

# Connect to gRPC server
channel = grpc.insecure_channel('localhost:50051')
stub = VectorIndexServiceStub(channel)

# Create index
response = stub.CreateIndex(CreateIndexRequest(
name="my_index",
dimension=768,
metric=1, # COSINE
config=HnswConfig(
max_connections=16,
ef_construction=200,
ef_search=50
)
))
print(f"Created: {response.info.name}")

# Search
import numpy as np
query = np.random.randn(768).astype(np.float32)
response = stub.Search(SearchRequest(
index_name="my_index",
query=query.tolist(),
k=10
))
for result in response.results:
print(f"ID: {result.id}, Distance: {result.distance:.4f}")

Environment Variables

Override bundled binaries with custom paths:

export SOCHDB_SERVER_PATH=/path/to/sochdb-server
export SOCHDB_BULK_PATH=/path/to/sochdb-bulk
export SOCHDB_GRPC_SERVER_PATH=/path/to/sochdb-grpc-server

Advanced Features

TOON Format

Token-Optimized Output Notation - Achieve 40-66% token reduction for LLM context.

from sochdb import Database

# Sample records
records = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]

# Convert to TOON format
toon_str = Database.to_toon("users", records, ["name", "email"])
print(toon_str)
# Output: users[2]{name,email}:Alice,alice@example.com;Bob,bob@example.com

# Parse TOON back to records
table_name, fields, records = Database.from_toon(toon_str)
print(records)
# Output: [{"name": "Alice", "email": "alice@example.com"}, ...]

Token Comparison:

  • JSON (compact): ~165 tokens
  • TOON format: ~70 tokens (59% reduction!)

Use Case: RAG with LLMs

from sochdb import Database
import openai

with Database.open("./knowledge_base") as db:
# Query relevant documents
results = db.execute_sql("""
SELECT title, content
FROM documents
WHERE category = 'technical'
LIMIT 10
""")

# Convert to TOON for efficient context
records = [dict(row) for row in results]
toon_context = Database.to_toon("documents", records, ["title", "content"])

# Send to LLM (saves tokens!)
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Context:\n{toon_context}"},
{"role": "user", "content": "Summarize the documents"}
]
)

Batched Scanning

1000× fewer FFI calls for large dataset scans.

from sochdb import Database

with Database.open("./my_db") as db:
# Insert 10K test records
with db.transaction() as txn:
for i in range(10000):
txn.put(f"item:{i:05d}".encode(), f"value:{i}".encode())

# Regular scan: 10,000 FFI calls
txn = db.transaction()
count = sum(1 for _ in txn.scan(b"item:", b"item;"))
txn.abort()
print(f"Regular scan: {count} items")

# Batched scan: 10 FFI calls (1000× fewer!)
txn = db.transaction()
count = sum(1 for _ in txn.scan_batched(
start=b"item:",
end=b"item;",
batch_size=1000 # Fetch 1000 results per FFI call
))
txn.abort()
print(f"Batched scan: {count} items (much faster!)")

Performance:

DatasetRegular ScanBatched ScanSpeedup
10K items15ms2ms7.5×
100K items150ms12ms12.5×

Statistics & Monitoring

from sochdb import Database

with Database.open("./my_db") as db:
# Perform operations
for i in range(1000):
db.put(f"key:{i}".encode(), f"value:{i}".encode())

# Get runtime statistics
stats = db.stats()

print(f"Keys: {stats['keys_count']:,}")
print(f"Bytes written: {stats['bytes_written']:,}")
print(f"Bytes read: {stats['bytes_read']:,}")
print(f"Transactions: {stats['transactions_committed']}")

# Cache metrics
hits = stats['cache_hits']
misses = stats['cache_misses']
hit_rate = (hits / (hits + misses) * 100) if (hits + misses) > 0 else 0
print(f"Cache hit rate: {hit_rate:.1f}%")

Available Metrics:

  • keys_count - Total keys
  • bytes_written - Cumulative writes
  • bytes_read - Cumulative reads
  • transactions_committed - Successful transactions
  • cache_hits / cache_misses - Cache performance

Manual Checkpoint

Force durability checkpoint to flush data to disk.

from sochdb import Database

with Database.open("./my_db") as db:
# Bulk import
print("Importing 10K records...")
with db.transaction() as txn:
for i in range(10000):
txn.put(f"bulk:{i}".encode(), f"data:{i}".encode())

# Force checkpoint
lsn = db.checkpoint()
print(f"Checkpoint complete at LSN {lsn}")
print("All data is durable on disk!")

When to Use:

  • ✅ Before backups
  • ✅ After bulk imports
  • ✅ Before system shutdown
  • ✅ Periodic durability (every 5 minutes)

Python Plugins

Run Python code as database triggers.

from sochdb.plugins import PythonPlugin, PluginRegistry, TriggerEvent, TriggerAbort

# Define validation plugin
plugin = PythonPlugin(
name="user_validator",
code='''
def on_before_insert(row: dict) -> dict:
"""Validate and transform data."""
# Normalize email
if "email" in row:
row["email"] = row["email"].lower().strip()

# Validate age
if row.get("age", 0) < 0:
raise TriggerAbort("Age cannot be negative", code="INVALID_AGE")

# Add timestamp
import time
row["created_at"] = time.time()

return row
''',
triggers={"users": ["BEFORE INSERT"]}
)

# Register and use
registry = PluginRegistry()
registry.register(plugin)

# Fire trigger
row = {"name": "Alice", "email": " ALICE@EXAMPLE.COM ", "age": 30}
result = registry.fire("users", TriggerEvent.BEFORE_INSERT, row)
print(result["email"]) # "alice@example.com"
print(result["created_at"]) # 1704182400.0

Available Events:

  • BEFORE_INSERT, AFTER_INSERT
  • BEFORE_UPDATE, AFTER_UPDATE
  • BEFORE_DELETE, AFTER_DELETE

Transaction Advanced

from sochdb import Database

with Database.open("./my_db") as db:
# Get transaction ID
txn = db.transaction()
print(f"Transaction ID: {txn.id}")

# Perform operations
txn.put(b"key", b"value")

# Commit returns LSN (Log Sequence Number)
lsn = txn.commit()
print(f"Committed at LSN: {lsn}")

# Execute SQL within transaction
txn2 = db.transaction()
txn2.execute("INSERT INTO users VALUES (1, 'Alice')")
txn2.put(b"user:1:metadata", b'{"verified": true}')
txn2.commit() # Atomic SQL + KV operation

Error Handling

New in v0.4.1 — Comprehensive error types for production applications.

Error Hierarchy

from sochdb.errors import (
SochDBError, # Base error
DatabaseError, # General database errors
TransactionError, # Transaction failures
ConnectionError, # Connection issues
ProtocolError, # Wire protocol errors

# Namespace errors
NamespaceNotFoundError,
NamespaceExistsError,

# Collection errors
CollectionNotFoundError,
CollectionExistsError,
CollectionConfigError,

# Validation errors
ValidationError,
DimensionMismatchError,

# Lock errors (v0.4.1)
LockError,
DatabaseLockedError,
LockTimeoutError,
EpochMismatchError,
SplitBrainError,
)

Handling Lock Errors

from sochdb import Database
from sochdb.errors import DatabaseLockedError, LockTimeoutError

try:
db = Database.open("./shared_db")
except DatabaseLockedError as e:
print(f"Database locked by another process: {e}")
# Retry with concurrent mode
db = Database.open_concurrent("./shared_db")
except LockTimeoutError as e:
print(f"Timed out waiting for lock: {e}")

Handling Namespace Errors

from sochdb.errors import NamespaceNotFoundError, NamespaceExistsError

try:
ns = db.namespace("tenant_999")
except NamespaceNotFoundError:
# Create if not exists
ns = db.create_namespace("tenant_999")

try:
db.create_namespace("tenant_123")
except NamespaceExistsError:
print("Namespace already exists")

Handling Vector Dimension Errors

from sochdb.errors import DimensionMismatchError

try:
collection.insert([1.0, 2.0, 3.0]) # 3-dim
except DimensionMismatchError as e:
print(f"Expected {e.expected} dimensions, got {e.actual}")

Best Practices

1. Use SQL for Structured Data

# ✅ Good: Use SQL for relational data
db.execute_sql("CREATE TABLE users (...)")
db.execute_sql("INSERT INTO users VALUES (...)")
results = db.execute_sql("SELECT * FROM users WHERE age > 25")

2. Use K-V for Unstructured Data

# ✅ Good: Use K-V for documents, blobs, cache
db.put(b"cache:user:123", json.dumps(user).encode())
db.put(b"blob:image:456", image_bytes)

3. Use scan() for Multi-Tenancy

# ✅ Good: Efficient tenant isolation
tenant_id = "acme"
prefix = f"tenants/{tenant_id}/".encode()
end = f"tenants/{tenant_id};".encode()
data = list(db.scan(prefix, end))

4. Use Transactions

# ✅ Good: Atomic operations
with db.transaction() as txn:
txn.put(b"key1", b"value1")
txn.put(b"key2", b"value2")

5. Use Bulk API for Vectors

# ✅ Good: Fast bulk operations
bulk_build_index(embeddings, "index.hnsw")

# ❌ Bad: Slow FFI loop
for vec in vectors:
index.insert(vec) # 12× slower!

6. Use Batched Scanning for Large Datasets

# ✅ Good: Fast batched scan
txn = db.transaction()
for key, value in txn.scan_batched(b"prefix:", b"prefix;", batch_size=1000):
process(key, value)
txn.abort()

# ❌ Bad: Slow regular scan for large datasets
for key, value in txn.scan(b"prefix:", b"prefix;"):
process(key, value) # 1000× more FFI calls!

7. Use TOON Format for LLM Context

# ✅ Good: Token-efficient for LLMs
results = db.execute_sql("SELECT * FROM users LIMIT 100")
records = [dict(row) for row in results]
toon_context = Database.to_toon("users", records, ["name", "email"])
# Send to LLM - saves 40-66% tokens!

# ❌ Bad: Wasteful JSON for LLM context
json_context = json.dumps(records) # Uses 2× more tokens

8. Always Use Context Managers

# ✅ Good: Automatic cleanup
with Database.open("./db") as db:
db.put(b"key", b"value")

# ❌ Bad: Manual cleanup required
db = Database.open("./db")
db.put(b"key", b"value")
db.close()

Complete Examples

Example 1: Multi-Tenant SaaS with SQL + K-V

from sochdb import Database
import json

def main():
with Database.open("./saas_db") as db:
# SQL for tenant metadata
db.execute_sql("""
CREATE TABLE IF NOT EXISTS tenants (
id INTEGER PRIMARY KEY,
name TEXT,
created_at TEXT
)
""")

db.execute_sql("INSERT INTO tenants VALUES (1, 'ACME Corp', '2026-01-01')")
db.execute_sql("INSERT INTO tenants VALUES (2, 'Globex Inc', '2026-01-01')")

# K-V for tenant-specific data
db.put(b"tenants/1/users/alice", b'{"role":"admin","email":"alice@acme.com"}')
db.put(b"tenants/1/users/bob", b'{"role":"user","email":"bob@acme.com"}')
db.put(b"tenants/2/users/charlie", b'{"role":"admin","email":"charlie@globex.com"}')

# Query SQL
tenants = db.execute_sql("SELECT * FROM tenants ORDER BY name")

for tenant in tenants:
tenant_id = tenant['id']
tenant_name = tenant['name']

# Scan tenant-specific K-V data
prefix = f"tenants/{tenant_id}/".encode()
end = f"tenants/{tenant_id};".encode()
users = list(db.scan(prefix, end))

print(f"\n{tenant_name} ({len(users)} users):")
for key, value in users:
user_data = json.loads(value.decode())
print(f" {key.decode()}: {user_data['email']} ({user_data['role']})")

if __name__ == "__main__":
main()

Output:

ACME Corp (2 users):
tenants/1/users/alice: alice@acme.com (admin)
tenants/1/users/bob: bob@acme.com (user)

Globex Inc (1 users):
tenants/2/users/charlie: charlie@globex.com (admin)

Example 2: E-commerce with SQL

from sochdb import Database

with Database.open("./ecommerce") as db:
# Create schema
db.execute_sql("""
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT,
price REAL,
category TEXT
)
""")

db.execute_sql("""
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
product_id INTEGER,
quantity INTEGER,
total REAL
)
""")

# Insert data
db.execute_sql("INSERT INTO products VALUES (1, 'Laptop', 999.99, 'Electronics')")
db.execute_sql("INSERT INTO products VALUES (2, 'Mouse', 25.00, 'Electronics')")
db.execute_sql("INSERT INTO products VALUES (3, 'Desk', 299.99, 'Furniture')")

db.execute_sql("INSERT INTO orders VALUES (1, 1, 2, 1999.98)")
db.execute_sql("INSERT INTO orders VALUES (2, 2, 5, 125.00)")

# Analytics query
results = db.execute_sql("""
SELECT
products.category,
COUNT(orders.id) as order_count,
SUM(orders.total) as revenue
FROM products
JOIN orders ON products.id = orders.product_id
GROUP BY products.category
ORDER BY revenue DESC
""")

print("Category Performance:")
for row in results:
print(f"{row['category']}: {row['order_count']} orders, ${row['revenue']:.2f}")

Output:

Category Performance:
Electronics: 2 orders, $2124.98

API Reference

Database (Embedded)

MethodDescription
Database.open(path)Open/create database
put(key: bytes, value: bytes)Store key-value
get(key: bytes) -> bytes | NoneRetrieve value
delete(key: bytes)Delete key
put_path(path: str, value: bytes)Store by path
get_path(path: str) -> bytes | NoneGet by path
delete_path(path: str)Delete by path
scan(start: bytes, end: bytes)Iterate range
scan_prefix(prefix: bytes)Scan keys matching prefix
transaction()Begin transaction
execute_sql(query: str)Execute SQL ⭐
execute(query: str)Alias for execute_sql()
checkpoint() -> intForce checkpoint, returns LSN
stats() -> dictGet runtime statistics
to_toon(table, records, fields) -> strConvert to TOON format (static)
from_toon(toon_str) -> tupleParse TOON format (static)

Transaction

MethodDescription
idTransaction ID (property)
put(key: bytes, value: bytes)Put within transaction
get(key: bytes) -> bytes | NoneGet with snapshot isolation
delete(key: bytes)Delete within transaction
scan(start: bytes, end: bytes)Scan within transaction
scan_prefix(prefix: bytes)Scan keys matching prefix
scan_batched(start, end, batch_size)High-performance batched scan
execute(sql: str)Execute SQL within transaction
commit() -> intCommit, returns LSN
abort()Abort/rollback

Namespace

MethodDescription
db.create_namespace(config)Create new namespace
db.namespace(name)Get existing namespace
db.list_namespaces()List all namespaces
db.delete_namespace(name)Delete namespace
ns.create_collection(config)Create collection
ns.collection(name)Get collection
ns.list_collections()List collections

Collection

MethodDescription
insert(vector, metadata, id)Insert single vector
insert_batch(vectors, metadatas, ids)Batch insert
search(request)Unified search
vector_search(vector, k)Vector similarity search
keyword_search(text, k)BM25 keyword search
hybrid_search(vector, text, k)RRF fusion search
delete(id)Delete by ID
count()Vector count

PriorityQueue

MethodDescription
PriorityQueue.from_database(db, name)Create from database
PriorityQueue.from_client(client, name)Create from gRPC client
enqueue(priority, payload, ...)Add task
dequeue(worker_id)Claim task
ack(task_id)Acknowledge completion
nack(task_id)Negative ack (retry)
stats()Queue statistics
top_k(k)Get top K tasks

SochDBClient (gRPC)

MethodDescription
SochDBClient(address)Connect to server
put_kv(namespace, key, value)Put key-value
get_kv(namespace, key)Get value
vector_search(namespace, collection, query, k)Vector search
add_graph_node(...)Add graph node
add_graph_edge(...)Add graph edge
close()Close connection

IpcClient

MethodDescription
IpcClient.connect(path)Connect to server
ping() -> floatCheck latency
query(prefix: str)Create query builder
scan(prefix: str)Scan prefix

VectorIndex

MethodDescription
VectorIndex(dimension, metric, m, ef_construction)Create index
insert(id, vector)Insert single vector
insert_batch(ids, vectors)Batch insert (~15K vec/s)
search(vector, k, ef_search)Search k-NN
delete(id)Delete by ID
save(path)Save to disk
load(path)Load from disk

Bulk API

FunctionDescription
bulk_build_index(...)Build HNSW (~1,600 vec/s)
bulk_query_index(...)Query k-NN
bulk_info(index)Get index metadata

Configuration

db = Database.open("./my_db", config={
"create_if_missing": True,
"wal_enabled": True,
"sync_mode": "normal", # "full", "normal", "off"
"memtable_size_bytes": 64 * 1024 * 1024,
})

Testing

# Run tests
pytest tests/ -v

# Run specific test
pytest tests/test_sql.py -v

# With coverage
pytest --cov=sochdb tests/

Resources


Last updated: January 2026 (v0.4.7)