Cordum Agent Protocol (NATS + Redis pointers)
Tags: saga, compensation, job-status, scheduler
:::info Protocol surfaces
- Positioning / pitch: cordum.io/protocol — CAP as "MCP for agent behavior" (MCP = say, CAP = do).
- Wire spec + SDKs: github.com/cordum-io/cap — Go / Python / Node / C++ SDKs, protobuf specs, conformance tests.
- This page: how CAP is consumed by Cordum's control plane (subjects, semantics, pointers, state). :::
This document describes how control-plane components and external workers communicate on the bus, what goes into context_ptr / result_ptr, and how job state is tracked.
Actors
- API Gateway: writes context to Redis, publishes
BusPacket{JobRequest}tosys.job.submit, exposes HTTP/WS/gRPC, and streams bus events. - Scheduler: subscribes to
sys.job.submit,sys.job.result,sys.heartbeat; gates with Safety Kernel, selects a pool/worker subject, publishes tojob.*(pool) orworker.<id>.jobs(direct), and persists job state/result inJobStore. - Safety Kernel: gRPC
Checkservice; allows/denies topics per tenant (seeconfig/safety.yaml). - Workflow Engine: creates runs, publishes job steps to
sys.job.submit, and advances runs based on results. - External Workers: subscribe to
job.*subjects in queue groups, fetch context/result from Redis pointers, emitBusPacket{JobResult}tosys.job.result, and send heartbeats. - Context Engine: gRPC helper that builds context windows and maintains memory in Redis (not on the NATS bus).
Bus Subjects
sys.job.submit– inbound jobs to the scheduler.sys.job.result– job completions from workers.sys.job.progress– progress updates from workers.sys.job.dlq– dead-letter events (non-success results; used for debugging/retry workflows).sys.job.cancel– cancellation notifications (workers cancel matching in-flight job IDs).sys.heartbeat– worker heartbeats (fan-out, no queue group).sys.handshake– component registration (Handshake messages on connect).sys.alert– system alerts from any component.sys.workflow.event– workflow engine event emissions (SystemAlert).job.*– worker pools (map lives inconfig/pools.yaml, e.g.,job.default,job.batch).worker.<worker_id>.jobs– direct, worker-targeted delivery (used by the scheduler for least-loaded dispatch). Default subject constants are defined incore/protocol/capsdk(mirrors the CAP v2 module version ingo.mod).
Delivery Semantics (JetStream)
By default this system is plain NATS pub/sub (at-most-once). When JetStream is enabled (NATS_USE_JETSTREAM=1), the bus switches the durable subjects to explicit ack/nak semantics (at-least-once):
- Durable (JetStream):
sys.job.submit,sys.job.result,sys.job.dlq,job.*,worker.<id>.jobs - Best-effort (plain NATS):
sys.heartbeat(fan-out),sys.job.cancel,sys.job.progress,sys.workflow.event
Because at-least-once delivery can redeliver, handlers must be idempotent:
- Scheduler uses a per-job Redis lock before mutating state/dispatching.
- Workers should use a per-job lock and cache the published
JobResultmetadata so a redelivery can republish without re-running work. - Retryable handler errors are returned as “retry after …” and translated into a NAK-with-delay; non-retryable errors are ACKed (won’t redeliver).
Wire Contracts (CAP – github.com/cordum-io/cap/v2/cordum/agent/v1)
CAP is the canonical contract; Cordum does not duplicate these protos.
- Envelope:
BusPackettrace_id,sender_id,created_at,protocol_version(current:1)payloadoneof:JobRequest,JobResult,Heartbeat,SystemAlert,JobProgress,JobCancel,Handshake.signatureis part of CAP but not enforced by Cordum yet.
- JobRequest
job_id(UUID string),topic(e.g.,job.default),priority(INTERACTIVE|BATCH|CRITICAL).context_ptr(Redis URL, e.g.,redis://ctx:<job_id>).result_ptris carried onJobResult.memory_id(long-lived memory namespace),tenant_id,principal_id,labels(routing + observability).adapter_id(optional worker mode),envmap (tenant fallback), workflow metadata (e.g.parent_job_id,workflow_id), pluscontext_hintsandbudget(token + deadline hints).
Priority semantics:
- The scheduler treats
priorityas metadata only (no preemption or queue ordering today). - Workers may choose to use it for local ordering, but core does not enforce it.
- JobResult
job_id,status(PENDING|SCHEDULED|DISPATCHED|RUNNING|SUCCEEDED|FAILED|FAILED_RETRYABLE|FAILED_FATAL|CANCELLED|DENIED|TIMEOUT),result_ptr,worker_id,execution_ms, optionalerror_code/error_message,error_code_enum(structuredErrorCode, preferred over stringerror_code).FAILED_RETRYABLEis treated as a transient failure (no DLQ entry; workflow retry policy can re-dispatch).FAILED_FATALis treated as a terminal failure and triggers saga rollback.
- JobProgress
job_id,percent,message, optionalresult_ptr/artifact_ptrs, optional status hint.
- JobCancel
job_id,reason,requested_by.
- Heartbeat
worker_id,region,type,cpu_load,memory_load,gpu_utilization,active_jobs,capabilities,pool,max_parallel_jobs.
- PolicyCheckResponse
decision,reason,policy_snapshot,constraints,approval_required,approval_ref.remediations(optional suggestions withreplacement_topic,replacement_capability, label add/remove).
Pointer Scheme (Redis)
- Contexts live at
ctx:<job_id>(or a derived key) with pointerredis://ctx:<job_id>. - Results live at
res:<job_id>with pointerredis://res:<job_id>. - Artifacts live at
art:<id>with pointerredis://art:<id>. - Job metadata/state lives under
job:meta:<job_id>; per-state indices are maintained for reconciliation; recent jobs are kept injob:recent. - Context-engine memory is namespaced under
mem:<memory_id>:*(e.g.,mem:<memory_id>:events,mem:<memory_id>:summary). - Scheduler writes a worker snapshot JSON to
sys:workers:snapshotfor observability and control-plane consumers. - Gateway exposes a pointer reader for debugging/UI:
GET /api/v1/memory?ptr=<urlencoded redis://...>.
Lifecycle
- Client (gateway or script) writes context JSON to Redis and sets
context_ptrinJobRequest. - Publish
BusPacket{JobRequest}tosys.job.submit. - Scheduler:
- Records state
PENDINGin JobStore and adds job to trace. - Calls Safety Kernel; on deny → state
DENIED. - Uses pool map +
LeastLoadedStrategyto choose a subject (worker.<id>.jobswhen possible; otherwisejob.*); publishes job and moves state toSCHEDULED → DISPATCHED → RUNNING.
- Records state
- Worker consumes
job.*orworker.<id>.jobs, fetchescontext_ptr, performs work, writes result tores:<job_id>, and publishesBusPacket{JobResult}withresult_ptr. - Scheduler updates JobStore with terminal state from
JobResultand storesresult_ptr.FAILED_RETRYABLEdoes not emit a DLQ entry.FAILED_FATALinitiates saga compensation if a workflow is associated.
- Reconciler periodically marks old
DISPATCHED/RUNNINGjobs asTIMEOUTbased onconfig/timeouts.yaml. - Cancellation: gateway or scheduler publishes
BusPacket{JobCancel}tosys.job.cancel; workers cancel the matching in-flight job context and publish a terminalJobResult(CANCELLEDorTIMEOUT).
Safety & Tenancy
- Safety policy file (
config/safety.yaml) provides per-tenantallow_topics/deny_topics. - Gateway sets
JobRequest.tenant_idand also includes anenv["tenant_id"]fallback; scheduler writes decision/reason into JobStore for observability. - MCP calls should set
JobRequest.labels(mcp.server,mcp.tool,mcp.resource,mcp.action) so the Safety Kernel can enforce MCP allow/deny rules. - Jobs may include
JobMetadata(capability,risk_tags,requires,pack_id) for policy and routing enforcement. - Safety decisions may include
remediations; the gateway can apply one viaPOST /api/v1/jobs/{id}/remediateto create a new job with safer topic/capability/labels.
Context Engine (non-bus)
- gRPC service
ContextEngine(cmd/cordum-context-engine, binarycordum-context-engine) with RPCs:BuildWindow(memory_id, mode, logical_payload, max_input_tokens, max_output_tokens)→ list ofModelMessage.UpdateMemory(memory_id, logical_payload, model_response, mode)→ appends chat history or summaries.
- Uses the same Redis instance; keys are namespaced under
mem:<memory_id>:*.
Error Codes (CAP v2.5.2)
The ErrorCode enum provides structured error classification, replacing ad-hoc string error codes. Both string error_code and numeric error_code_enum are populated during the transition period.
| Range | Category | Examples |
|---|---|---|
| 100-105 | Protocol | VERSION_MISMATCH, MALFORMED_PACKET, SIGNATURE_INVALID, SIGNATURE_MISSING, UNKNOWN_PAYLOAD |
| 200-206 | Job | NOT_FOUND, DUPLICATE, TIMEOUT, PERMISSION_DENIED, RESOURCE_EXHAUSTED, INVALID_INPUT, WORKER_UNAVAILABLE |
| 300-302 | Safety | DENIED, POLICY_VIOLATION, RISK_TAG_BLOCKED |
| 400-402 | Transport | PUBLISH_FAILED, SUBSCRIBE_FAILED, CONNECTION_LOST |
Usage: always use pb.ErrorCode_ERROR_CODE_* constants from core/protocol/pb/v1, never raw integers.
Handshake (Component Registration)
On NATS connect, each service publishes a BusPacket{Handshake} to sys.handshake advertising its role and capabilities. The scheduler tracks these to maintain a component registry.
- Handshake fields:
component_id,role(ComponentRoleenum),supported_versions,capabilities(map),sdk_version. - ComponentRole values:
GATEWAY,SCHEDULER,WORKER,ORCHESTRATOR,CONTROLLER. - Services that publish: api-gateway (GATEWAY), scheduler (SCHEDULER), workflow-engine (ORCHESTRATOR), workers (WORKER via SDK runtime).
- Services that skip: safety-kernel, context-engine (gRPC-only, no NATS connection).
- Handshake failure is non-fatal — services log a warning and continue startup.
Enhanced SystemAlert (CAP v2.5.2)
SystemAlert now carries structured fields alongside the deprecated string-based fields:
| New Field | Type | Description |
|---|---|---|
severity | AlertSeverity enum | INFO, WARNING, ERROR, CRITICAL |
error_code_enum | ErrorCode enum | Structured error classification |
source_component | string | Originating service instance (e.g., scheduler-1) |
details | map<string,string> | Key-value context (sender, subject, etc.) |
trace_id | string | Correlates alert to a job or workflow run |
Backward compatibility: deprecated fields (level, component, code) are still populated during the transition.
Bus-Layer Validation
Cordum validates incoming BusPacket payloads at the bus ingress using CAP SDK helpers:
ValidateJobRequest: rejects packets with emptyjob_idortopic.ValidateJobResult: rejects packets with emptyjob_id,worker_id, or unspecified status.- Invalid packets are logged, counted (
validation_rejections_totalmetric), and dropped silently.
Python Guard SDK
The CAP repository includes a Python Guard SDK (cordum-guard) for integrating Cordum safety checks into LangChain and LlamaIndex pipelines. See python-guard/ in the CAP repository.