Examples
Real patterns for building multi-language systems with Saikuro.
Table of Contents
- TypeScript Provider, Python Caller
- Python Provider, TypeScript Caller
- Event Streaming Across Languages
- Bidirectional Chat Channel
- Capability-Gated Admin Functions
- Batch Calls for Bulk Operations
- Testing with In-Memory Transport
- Rust Runtime Provider
TypeScript Provider, Python Caller
A TypeScript service exposes user management. Python handles a background job that needs user data.
TypeScript provider:
// services/users/provider.ts
import { Provider } from 'saikuro';
import { db } from './db';
const provider = new Provider({ namespace: 'users' });
provider.register('getById', async (id: string) => {
const user = await db.users.findById(id);
if (!user) throw new Error(`User ${id} not found`);
return user;
});
provider.register('list', async (options: { page: number; limit: number }) => {
const { items, total } = await db.users.paginate(options);
return { items, total, page: options.page };
});
provider.register('deactivate', async (id: string) => {
await db.users.update(id, { active: false });
return { ok: true };
});
await provider.serve();
Python caller:
# jobs/cleanup.py
import asyncio
from saikuro import Client
async def deactivate_inactive_users():
client = Client()
await client.connect()
# Get all users (paginated)
page = 0
while True:
result = await client.call('users.list', [{'page': page, 'limit': 100}])
for user in result['items']:
if not user['lastSeen'] or is_stale(user['lastSeen']):
await client.call('users.deactivate', [user['id']])
print(f"Deactivated {user['id']}")
if len(result['items']) < 100:
break
page += 1
asyncio.run(deactivate_inactive_users())
Python Provider, TypeScript Caller
A Python data science service exposes model inference. A TypeScript API calls it.
Python provider:
# ml/inference/provider.py
import asyncio
from saikuro import Provider
import numpy as np
provider = Provider(namespace='model')
model = load_model('./weights.pt')
@provider.register('predict')
async def predict(features: list[float]) -> dict:
arr = np.array(features)
score = model.predict(arr)
return {'score': float(score), 'label': classify(score)}
@provider.register('batch_predict')
async def batch_predict(items: list[list[float]]) -> list[dict]:
arrs = np.array(items)
scores = model.predict_batch(arrs)
return [{'score': float(s), 'label': classify(s)} for s in scores]
asyncio.run(provider.serve())
TypeScript caller:
// api/routes/classify.ts
import { Client } from 'saikuro';
const client = new Client();
await client.connect();
export async function classifyRequest(req, res) {
const { features } = req.body;
const result = await client.call('model.predict', [features]);
res.json({
score: result.score,
label: result.label
});
}
The TypeScript API doesn’t care that the model is Python. It just calls model.predict and gets a result.
Event Streaming Across Languages
A Rust service produces real-time events. TypeScript and Python consumers subscribe to them.
Rust provider:
use saikuro::{Provider, Result};
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() -> Result<()> {
let mut provider = Provider::new("events");
provider.register_stream("subscribe", |filter: String| async move {
let stream = create_event_stream(&filter);
stream.map(|e| Ok(e))
});
provider.serve().await
}
TypeScript consumer:
const client = new Client();
await client.connect();
console.log('Listening for errors...');
for await (const event of client.stream('events.subscribe', ['error'])) {
console.log(`[${event.timestamp}] ${event.message}`);
await alerting.notify(event);
}
Python consumer (same stream, different process):
client = Client()
await client.connect()
async for event in client.stream('events.subscribe', ['error']):
await metrics.increment('errors', tags={'source': event['source']})
await pagerduty.trigger(event)
Both consumers get the same stream. They’re independent, so one slow consumer doesn’t affect the other.
Bidirectional Chat Channel
A Python backend manages chat rooms. TypeScript browser clients connect via WebSocket.
Python provider:
from saikuro import Provider
from collections import defaultdict
import asyncio
provider = Provider(namespace='chat')
rooms: dict[str, list] = defaultdict(list)
@provider.register_channel('join')
async def join_room(args, chan):
room_id = args[0]['roomId']
username = args[0]['username']
rooms[room_id].append(chan)
# Broadcast join notification
await broadcast(room_id, {'type': 'join', 'user': username}, exclude=chan)
try:
async for msg in chan.incoming():
if msg['type'] == 'message':
await broadcast(room_id, {
'type': 'message',
'user': username,
'text': msg['text'],
})
finally:
rooms[room_id].remove(chan)
await broadcast(room_id, {'type': 'leave', 'user': username})
async def broadcast(room_id, msg, exclude=None):
for chan in list(rooms[room_id]):
if chan is not exclude:
try:
await chan.send(msg)
except Exception:
rooms[room_id].remove(chan)
await provider.serve()
TypeScript browser client:
const client = new Client({
transport: 'websocket',
url: 'ws://localhost:7700'
});
await client.connect();
const chan = await client.channel('chat.join', [{
roomId: 'general',
username: 'alice'
}]);
// Send messages
sendButton.onclick = () => {
chan.send({ type: 'message', text: input.value });
};
// Receive messages
for await (const msg of chan) {
if (msg.type === 'message') {
appendMessage(msg.user, msg.text);
} else if (msg.type === 'join') {
appendNotice(`${msg.user} joined`);
}
}
Capability-Gated Admin Functions
Admin functions that require a capability token.
Schema:
{
"version": 1,
"namespaces": {
"admin": {
"functions": {
"purge_queue": {
"args": ["string"],
"returns": "i32",
"visibility": "internal",
"capabilities": ["admin.write"]
},
"get_stats": {
"args": [],
"returns": "Stats",
"visibility": "internal",
"capabilities": ["admin.read"]
}
}
}
}
}
TypeScript provider:
const provider = new Provider({ namespace: 'admin' });
provider.register('purge_queue', async (queueName: string): Promise<number> => {
const count = await queue.purge(queueName);
return count;
});
provider.register('get_stats', async () => {
return await metrics.snapshot();
});
await provider.serve();
Authorized caller:
const client = new Client({
capabilities: { token: process.env.ADMIN_TOKEN }
});
await client.connect();
// Works: token grants admin.write
const purged = await client.call('admin.purge_queue', ['dead-letter']);
console.log(`Purged ${purged} messages`);
Unauthorized caller:
const client = new Client(); // No token
await client.connect();
// Throws: CapabilityDenied
await client.call('admin.purge_queue', ['dead-letter']);
Batch Calls for Bulk Operations
Load several resources in one round trip.
const client = new Client();
await client.connect();
// Instead of three sequential calls...
// const user = await client.call('users.getById', ['u1']);
// const prefs = await client.call('prefs.getForUser', ['u1']);
// const notifications = await client.call('notifications.getUnread', ['u1']);
// ...one batch:
const [user, prefs, notifications] = await client.batch([
{ target: 'users.getById', args: ['u1'] },
{ target: 'prefs.getForUser', args: ['u1'] },
{ target: 'notifications.getUnread', args: ['u1'] },
]);
// All three results available immediately
renderProfile({ user, prefs, notifications });
Testing with In-Memory Transport
Use InMemoryTransport.pair() for fast, isolated tests with no runtime process.
// math.test.ts
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { Provider, Client, InMemoryTransport } from 'saikuro';
describe('math provider', () => {
let provider: Provider;
let client: Client;
beforeEach(async () => {
const [pt, ct] = InMemoryTransport.pair();
provider = new Provider({ namespace: 'math', transport: pt });
provider.register('add', (a: number, b: number) => a + b);
provider.register('multiply', (a: number, b: number) => a * b);
await provider.serve();
client = new Client({ transport: ct });
await client.connect();
});
afterEach(async () => {
await client.disconnect();
await provider.stop();
});
it('adds two numbers', async () => {
expect(await client.call('math.add', [1, 2])).toBe(3);
});
it('multiplies two numbers', async () => {
expect(await client.call('math.multiply', [6, 7])).toBe(42);
});
it('handles errors from the provider', async () => {
provider.register('divide', (a: number, b: number) => {
if (b === 0) throw new Error('division by zero');
return a / b;
});
await expect(client.call('math.divide', [1, 0]))
.rejects.toThrow('division by zero');
});
});
# test_math.py
import pytest
from saikuro import Provider, Client, InMemoryTransport
@pytest.fixture
async def math_client():
provider_t, client_t = InMemoryTransport.pair()
provider = Provider(namespace='math', transport=provider_t)
@provider.register('add')
def add(a: int, b: int) -> int:
return a + b
await provider.serve()
client = Client(transport=client_t)
await client.connect()
yield client
await client.disconnect()
await provider.stop()
@pytest.mark.asyncio
async def test_add(math_client):
result = await math_client.call('math.add', [1, 2])
assert result == 3
Rust Runtime Provider
High-throughput or latency-sensitive work in Rust, called from other languages.
// src/main.rs
use saikuro::{Provider, Result};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<()> {
let mut provider = Provider::new("index");
// Build an in-memory search index
let index = build_index();
provider.register("search", move |query: String| {
let index = index.clone();
async move {
let results = index.search(&query);
Ok(results)
}
});
provider.register("bulk_insert", move |items: Vec<HashMap<String, String>>| {
let index = index.clone();
async move {
let count = index.bulk_insert(items).await?;
Ok(count)
}
});
println!("index provider ready");
provider.serve().await
}
Python caller (doing the heavy lifting in Rust):
client = Client()
await client.connect()
# Insert from Python, search from Python, but the index lives in Rust
await client.cast('index.bulk_insert', [documents])
results = await client.call('index.search', ['saikuro cross-language'])
for r in results:
print(r['title'])
The Python code is clean. The performance-critical indexing and search is Rust. Saikuro handles the boundary.