Skip to main content

WebSocket Streaming Protocol

Reference for the Cordum real-time event stream over WebSocket. The gateway exposes two WebSocket endpoints for live updates: a global stream and a per-job stream.

For the REST API reference, see api-reference.md. For the SDK client, see sdk-reference.md.


Endpoints

EndpointAuthDescription
GET /api/v1/streamAPI key (subprotocol) + admin roleGlobal event stream — all jobs, heartbeats, audit events
GET /api/v1/jobs/{id}/streamAPI key (subprotocol) + tenant matchPer-job event stream — events for a specific job only

1. Connection

URL

Derive the WebSocket URL from the gateway HTTP base URL:

HTTP BaseWebSocket URL
http://localhost:8081ws://localhost:8081/api/v1/stream
https://cordum.example.comwss://cordum.example.com/api/v1/stream

Authentication

Authentication is performed via the WebSocket subprotocol header. This avoids sending credentials as query parameters (which appear in server logs).

Format: cordum-api-key.<base64url-encoded-api-key>

The API key is base64url-encoded (RFC 4648 without padding):

Original key: my-secret-api-key-1234
Base64url: bXktc2VjcmV0LWFwaS1rZXktMTIzNA
Subprotocol: cordum-api-key.bXktc2VjcmV0LWFwaS1rZXktMTIzNA

Important: Strip = padding and use base64url alphabet (- and _ instead of + and /), because = is not valid in subprotocol names per the WebSocket RFC.

The gateway echoes back the matched subprotocol in the Sec-WebSocket-Protocol response header.

Authorization

  • Global stream (/api/v1/stream): Requires admin role.
  • Per-job stream (/api/v1/jobs/{id}/stream): Requires tenant access to the job's tenant.

Tenant Isolation

Each WebSocket client is associated with a tenant from the authenticated request context. Events are filtered server-side:

  • Events with a matching tenant field are delivered
  • Events without a tenant field are dropped for non-cross-tenant clients
  • Cross-tenant clients (admin) receive all events

2. Message Format

All messages are JSON-encoded BusPacket protobuf messages serialized with protojson. Each message represents a single bus event.

Wire Format (protojson)

{
"traceId": "abc-123",
"senderId": "cordum-scheduler",
"createdAt": {
"seconds": "1707840000",
"nanos": 0
},
"jobResult": {
"jobId": "job-xyz",
"status": "JOB_STATUS_SUCCEEDED",
"workerId": "worker-1",
"executionMs": "1250",
"resultPtr": "res:job:job-xyz"
}
}

Payload Variants

Each BusPacket contains exactly one payload field:

FieldProto TypeDescription
jobRequestJobRequestJob submitted to the bus
jobResultJobResultJob completed (succeeded, failed, cancelled)
jobProgressJobProgressJob progress update (percent, message)
jobCancelJobCancelJob cancellation signal
heartbeatHeartbeatWorker heartbeat with pool, active jobs, capacity
alertAlertSystem alert

Common Fields

FieldTypeDescription
traceIdstringTrace correlation ID
senderIdstringID of the sender (scheduler, worker, etc.)
createdAtTimestampEvent creation time ({seconds, nanos})
protocolVersionstringCAP protocol version
signaturebytesOptional ECDSA packet signature

3. Event Types

The dashboard normalizes BusPackets into StreamEvent objects. Here are the event types and their payloads:

Job Events

Event TypeSource FieldPayload Fields
job.submitjobRequestjobId, topic, tenantId, labels
job.resultjobResultjobId, status, errorCode, errorMessage, executionMs, workerId
job.result.succeededjobResultSame as job.result (status-specific)
job.result.failedjobResultSame as job.result (status-specific)
job.result.cancelledjobResultSame as job.result (status-specific)
job.progressjobProgressjobId, percent, message, status
job.canceljobCanceljobId, reason

Worker Events

Event TypeSource FieldPayload Fields
worker.heartbeatheartbeatworkerId, pool, activeJobs, maxParallelJobs

System Events

Event TypeSource FieldPayload Fields
system.alertalertVaries by alert type

Audit Events

The gateway subscribes to sys.audit.> NATS subjects. Audit events arrive as BusPackets and are forwarded to WebSocket clients as-is.


4. Bus Subscriptions

The gateway subscribes to these NATS subjects and forwards matching packets to WebSocket clients:

NATS SubjectEvents
sys.heartbeatWorker heartbeats
sys.job.>All job lifecycle events (submit, result, progress, cancel)
sys.audit.>Audit trail events
sys.job.dlqDead-letter queue entries (also persisted to DLQ store)

5. Per-Job Streaming

Connect to /api/v1/jobs/{id}/stream to receive only events for a specific job:

ws://localhost:8081/api/v1/jobs/job-abc123/stream

Server-side filtering:

  • Only events matching the specified jobId are delivered
  • Tenant access is verified against the job's tenant before the upgrade
  • Returns 404 if the job does not exist
  • Returns 403 if the caller's tenant does not match

6. Reconnection Strategy

The gateway now sends WebSocket ping frames every 30 seconds by default and expects the client to process control frames and reply with pong frames. Clients should still implement reconnection with exponential backoff for process restarts, network partitions, credential revocation, and any transport that remains unavailable after keepalive retries.

Server Keepalive and Revalidation

  • The gateway sends a ping every 30s by default (GATEWAY_WS_PING_INTERVAL)
  • The server extends the read deadline when it receives a pong and treats missing pongs as a dead connection (GATEWAY_WS_PONG_TIMEOUT)
  • Long-lived WebSocket credentials are revalidated every 120s
  • Transient auth backend failures (for example network timeouts) are retried before the connection is dropped
  • The HTTP server idle timeout defaults to 120s (GATEWAY_HTTP_IDLE_TIMEOUT) so quiet upgraded connections are not closed before the keepalive loop runs

Client requirement: keep a read loop running. In Gorilla WebSocket and most browser runtimes, ping/pong handlers only run while the connection is being read.

ParameterValue
Initial backoff1 second
Maximum backoff30 seconds
Backoff factor2x
Reset on successYes (reset to initial on onopen)

Connection Lifecycle

connect() → onopen → receiving messages...
↓ (connection drops)
onclose → wait(backoff) → connect()
backoff *= 2 (capped at max)

Connection Identification

Every WebSocket connection is assigned a unique conn_id — a 16-character hex string generated from crypto/rand. This ID appears in all lifecycle log entries and allows operators to trace a single connection across connect, revalidation, and disconnect events.

Lifecycle Logging

The gateway emits structured slog.Info logs at connection boundaries:

Connect:

level=INFO msg="ws connected" conn_id=a1b2c3d4e5f67890 remote=10.0.1.5:52340 tenant=default user_agent=Go-http-client/1.1

Disconnect:

level=INFO msg="ws disconnected" conn_id=a1b2c3d4e5f67890 remote=10.0.1.5:52340 tenant=default duration=482s reason=client_close

Disconnect reasons:

ReasonMeaning
client_closeClient closed the connection normally
ping_timeoutClient failed to respond to ping within the pong timeout
revalidation_revokedCredential revalidation determined the API key is no longer valid
slow_clientClient send buffer was full (100 events queued)
shutdownGateway is shutting down

Prometheus Metrics

The gateway exports 9 WebSocket metrics on the /metrics endpoint (default port 9092):

MetricTypeLabelsDescription
cordum_gateway_ws_clients_activeGaugeCurrent active WebSocket connections
cordum_gateway_ws_connection_duration_secondsHistogramConnection lifetime (buckets: 1s to 4h)
cordum_gateway_ws_pings_sent_totalCounterPing frames sent to clients
cordum_gateway_ws_pongs_received_totalCounterPong frames received from clients
cordum_gateway_ws_pong_timeouts_totalCounterConnections closed after missing pong
cordum_gateway_ws_packets_dropped_totalCounterBus packets dropped due to marshal failure
cordum_gateway_ws_slow_client_evictions_totalCounterVecreasonClients evicted (buffer full)
cordum_gateway_ws_revalidation_totalCounterVecoutcomeCredential revalidation outcomes (ok, revoked, error)
cordum_gateway_ws_reconnections_totalCounterClient reconnections within the reconnect window

Slow Client Eviction

The server buffers up to 100 events per client (make(chan wsEvent, 100)). If a client falls behind and the buffer is full, the server closes the connection. The client should reconnect.

Missed Events

There is no replay or catch-up mechanism. When reconnecting, poll the REST API to get the current state of any resources you were tracking.


7. Client Examples

Browser (JavaScript)

const apiKey = "your-api-key";
// Base64url encode without padding
const encoded = btoa(apiKey)
.replace(/\+/g, "-")
.replace(/\//g, "_")
.replace(/=+$/, "");
const subprotocol = `cordum-api-key.${encoded}`;

const ws = new WebSocket("ws://localhost:8081/api/v1/stream", [subprotocol]);

ws.onopen = () => console.log("Connected");

ws.onmessage = (event) => {
const packet = JSON.parse(event.data);

if (packet.jobResult) {
const status = packet.jobResult.status.replace(/^.*_/, "").toLowerCase();
console.log(`Job ${packet.jobResult.jobId}: ${status}`);
}
if (packet.heartbeat) {
console.log(`Worker ${packet.heartbeat.workerId}: ${packet.heartbeat.activeJobs} active`);
}
};

ws.onclose = () => {
console.log("Disconnected — reconnecting...");
setTimeout(() => { /* reconnect logic */ }, 1000);
};

Node.js

import WebSocket from "ws";

const apiKey = process.env.CORDUM_API_KEY;
const encoded = Buffer.from(apiKey).toString("base64url");
const subprotocol = `cordum-api-key.${encoded}`;

const ws = new WebSocket("ws://localhost:8081/api/v1/stream", [subprotocol]);

ws.on("open", () => console.log("Connected"));

ws.on("message", (data) => {
const packet = JSON.parse(data.toString());
if (packet.jobResult) {
console.log(`Job ${packet.jobResult.jobId}: ${packet.jobResult.status}`);
}
});

ws.on("close", () => console.log("Disconnected"));

wscat (Testing)

# Install wscat
npm install -g wscat

# Connect (API key as subprotocol)
KEY=$(echo -n "$CORDUM_API_KEY" | base64 | tr '+/' '-_' | tr -d '=')
wscat -c "ws://localhost:8081/api/v1/stream" \
-s "cordum-api-key.$KEY"

Go

import "github.com/gorilla/websocket"

apiKey := os.Getenv("CORDUM_API_KEY")
encoded := base64.RawURLEncoding.EncodeToString([]byte(apiKey))
subprotocol := "cordum-api-key." + encoded

dialer := websocket.Dialer{
Subprotocols: []string{subprotocol},
}
conn, _, err := dialer.Dial("ws://localhost:8081/api/v1/stream", nil)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("read error:", err)
break
}
fmt.Println(string(message))
}

8. Dashboard Integration

The Cordum dashboard uses two hooks for WebSocket integration:

useEventStream

  • Manages the single WebSocket connection to /api/v1/stream
  • Authenticates via the cordum-api-key.<base64url> subprotocol
  • Auto-reconnects with exponential backoff (1s to 30s)
  • Converts raw BusPacket protojson to normalized StreamEvent objects
  • Dispatches events to:
    • React Query cache invalidation — events matching job.*, workflow.*, approval.*, worker.*, dlq.*, policy.*, run.*, pack.*, safety.*, audit.* invalidate their respective query keys
    • Zustand event store — all events buffered for the live activity feed
    • Safety decision storesafety.* events pushed to a dedicated buffer

useRunStream

  • Subscribes to the Zustand event store (not a separate WebSocket)
  • Filters events by run ID for a specific workflow run
  • Optimistically patches React Query cached run data for instant UI updates
  • Handles: step status changes, job result mapping to steps, run-level status changes

Cache Invalidation Map

Event PrefixQuery Keys Invalidated
job.*["jobs"]
workflow.*["workflows"]
approval.*["approvals"], ["approvals", "nav"]
worker.*["workers"]
dlq.*["dlq"], ["dlq", "nav"]
policy.*["policy-bundles"], ["policy-rules"]
run.*["workflow-runs"], ["runs"]
pack.*["packs"]
safety.*["safety"]
audit.*["audit"]

9. Server-Side Details

Write Timeout

The server sets a 5-second write deadline per message. If the client does not consume a message within this window, the write fails and the connection is closed.

Origin Check

The WebSocket upgrader calls isAllowedOrigin(r) which checks against the configured CORS allowed origins (CORDUM_ALLOWED_ORIGINS, CORDUM_CORS_ALLOW_ORIGINS, or CORS_ALLOW_ORIGINS).

Event Buffer

  • Internal broadcast channel: unbuffered (events are dropped if no goroutine is ready)
  • Per-client channel: 100 events buffered
  • Slow clients are detected during broadcast and disconnected

Shutdown

When the gateway shuts down, it closes the broadcast channel (stopBusTaps), which terminates the broadcast goroutine and causes all client connections to close gracefully.