Workflow Step Types Reference
This document describes all workflow step types currently modeled in Cordum (core/workflow/models.go) and how they execute in the workflow engine (core/workflow/engine.go).
1. Overview
Cordum workflows are DAGs of Step nodes. A run advances when dependencies are satisfied and scheduleReady() dispatches ready nodes.
Run lifecycle states:
pendingrunningwaitingsucceededfailedcancelledtimed_out
Step lifecycle states:
pendingrunningwaitingsucceededfailedcancelledtimed_out
Execution model by type:
- Dedicated engine handlers:
worker,approval,condition,delay,notify,transform,storage,switch,parallel,loop,subworkflow - Generic job dispatch fallback:
llm,http,container,script,input for_eachfan-out behavior is implemented on any step withfor_eachset (creates child steps likestep_id[0],step_id[1])
2. Expression Syntax Reference
Expression evaluation uses Eval() in core/workflow/eval.go.
Supported expression forms:
- Literals: numbers, booleans, quoted strings
- Dot paths:
input.customer.id,ctx.session.region,steps.validate.output.ok - Comparisons:
==,!=,>,<,>=,<= - Unary boolean not:
!steps.check.output - Functions:
length(x)for arrays, strings, mapsfirst(x)for arrays
Template syntax in step input values uses ${expr} and is resolved by evalTemplates().
Examples:
length(input.items) > 0
steps.validate.output.ok == true
${input.customer.id}
"ticket-${input.case_id}-${first(input.tags)}"
3. Scope Variables Reference
| Variable | Available now | Meaning |
|---|---|---|
input.* | yes | WorkflowRun.Input payload |
ctx.* | yes | WorkflowRun.Context map |
steps.* | yes | run.Context["steps"] outputs/pointers |
item | yes (for for_each) | Current fan-out item |
loop.index | yes (loop) | Zero-based iteration index for loop child dispatch |
loop.iteration | yes (loop) | One-based iteration count (index + 1) |
loop.previous_output | yes (loop) | Previous iteration output (inline output or result pointer) |
4. Implemented Step Types (Dedicated Handlers)
4.1 worker
Purpose:
- Default worker/job dispatch step.
Key config fields:
topicworker_idinputroute_labelstimeout_secretry
Execution behavior:
- Dispatches a
JobRequestonsys.job.submit. - Stores step payload in memory and sets
ContextPtr.
Output format:
- On success, step output stores
result_ptrand optional inlined decoded payload inrun.Context.steps[step_id].
Dashboard UI:
- Rendered as
agent-task(orpack-action/tool-calldepending on metadata). - Node details show job link, safety decision, request/result payload.
YAML example:
steps:
classify:
type: worker
topic: job.support.classify
worker_id: support-pool
input:
ticket_id: "${input.ticket_id}"
body: "${input.message}"
route_labels:
pool: support
timeout_sec: 60
retry:
max_retries: 2
initial_backoff_sec: 1
max_backoff_sec: 8
multiplier: 2
output_path: ctx.classification
4.2 approval
Purpose:
- Human gate before workflow continues.
Key config fields:
input(optional context for reviewers)input_schema(recommended wheninputmust contain required decision fields)output_path(optional)
Execution behavior:
- Engine dispatches a gate job with topic
sys.approval.gate. - Step enters
running(has active gate job). - Gate job appears on the Approvals page alongside policy approvals.
- Operator approves or rejects via the unified Approvals queue.
- On approval: gate job auto-completes, step succeeds, DAG continues.
- On rejection: gate job fails, step fails,
on_errorhandler fires if configured.
Recommended reviewer-facing fields:
amountcurrencyvendoritemsapproval_reasonnext_effect
These fields are surfaced by the approvals API/dashboard as human-readable decision summary data so approvers can answer: what is this, why is it being escalated, and what will happen next?
Validation and degraded behavior:
- Use
input_schemato require the decision fields that must always exist for a valid approval. - Optional fields may be omitted safely; the approvals API will still return a partial but informative decision summary.
- If persisted workflow approval context cannot be hydrated, the approvals API marks the
approval with an explicit degraded status such as
missing,malformed, orunavailablerather than silently showing an empty shell.
Output format:
- Status only; approval metadata is reflected in run/step state.
Dashboard UI:
- Gate approvals appear on the Approvals page with a
Workflow Gatebadge. - The dashboard prioritizes decision-first business context (title, reason, amount, vendor, step, next effect) and keeps workflow/job IDs as secondary audit metadata.
YAML example:
steps:
manual_review:
type: approval
depends_on: [classify]
input:
amount: "${input.request.amount}"
currency: "${input.request.currency}"
vendor: "${input.request.vendor}"
items: "${input.request.items}"
approval_reason: "${input.request.reason}"
next_effect: "Approve to continue payment processing."
input_schema:
type: object
properties:
amount: { type: number }
currency: { type: string }
vendor: { type: string }
items: { type: array }
approval_reason: { type: string }
next_effect: { type: string }
required: [amount, currency, vendor, items, approval_reason]
4.3 condition
Purpose:
- Branch gate based on expression result.
Key config fields:
condition(required)output_path(optional)
Execution behavior:
- Evaluates expression via
evalCondition(). - On success stores boolean output and marks step
succeeded. - On missing/invalid expression marks step
failed.
Output format:
- Boolean (
true/false) in step output.
Dashboard UI:
- Dedicated condition detail panel shows expression and boolean result.
- DAG node has true/false branch handles.
YAML example:
steps:
should_escalate:
type: condition
condition: "length(input.flags) > 0"
output_path: ctx.gates.should_escalate
depends_on: [classify]
4.4 delay
Purpose:
- Wait for a relative or absolute time.
Key config fields:
delay_secdelay_until(RFC3339)timeout_sec(fallback delay when others not set)
Execution behavior:
- Computes delay via
delayForStep(). - Schedules internal timer and transitions from
runningtosucceededwhen due.
Output format:
- No custom payload; state/timing fields indicate completion.
Dashboard UI:
- Dedicated delay detail panel shows configured delay and elapsed/remaining time.
YAML example:
steps:
cool_down:
type: delay
depends_on: [manual_review]
delay_sec: 300
4.5 notify
Purpose:
- Emit a workflow event alert.
Key config fields:
input.levelinput.messageinput.codeinput.component
Execution behavior:
- Evaluates template input.
- Publishes
BusPacket_Alertonsys.workflow.event.
Output format:
- No result pointer; completion state plus timeline event.
Dashboard UI:
- Generic detail panel (status/output).
notifyicon available in DAG nodes.
YAML example:
steps:
alert_ops:
type: notify
depends_on: [manual_review]
input:
level: INFO
component: workflow-engine
code: REVIEW_COMPLETED
message: "Review completed for ${input.ticket_id}"
5. Newer Step Types (Mixed Implementation Status)
These step types exist as constants in models.go. switch, parallel, loop, subworkflow, and storage now have dedicated handlers. The remaining types in this section still use generic dispatch fallback unless noted otherwise.
5.1 switch
Purpose:
- Multi-branch case routing with inline engine evaluation.
Key config fields:
condition(required switch expression)input.casesinput.default(orinput.default_step)
Current behavior:
- Evaluates
conditionviaEval(). - Matches first case where value equals
match/when/value. - Uses default branch when no case matches.
- Marks non-selected branches as
cancelledwith reasonswitch_branch_not_taken. - Completes inline with output
{ matched_case, matched_value, target_step }. - Emits timeline event
step_switch_evaluated.
Output format:
- Inline output object stored in
run.Context.steps[step_id].output.
Dashboard UI:
- Switch config with expression, cases list, and default branch selector.
- DAG detail panel shows case rows, matched branch highlighting, and fallback branch state.
YAML example:
steps:
route_by_priority:
type: switch
input:
cases:
- when: "input.priority == 'critical'"
next: urgent_path
default: normal_path
5.2 parallel
Purpose:
- Concurrent execution of multiple child step definitions with completion strategies.
Key config fields:
input.steps(array of child step IDs to execute)input.strategy(all|any|n_of_m)input.required(required success count when strategy isn_of_m)max_parallel(optional dispatch throttle)
Execution behavior:
- Parent step initializes child step runs from
input.steps. - Child jobs are dispatched concurrently (or throttled by
max_parallel). - Completion strategies:
all: succeed only when all children succeed; fail on first child failure.any: succeed when first child succeeds; cancel remaining children.n_of_m: succeed whenrequiredchildren succeed; fail when threshold becomes unreachable.
- Parent output aggregates child outputs as
map[child_step_id] -> output entry. - Timeline events:
step_parallel_startedstep_parallel_completed
Output format:
- Parent step writes aggregated map to
run.Context.steps[parallel_step_id].output. - Child step outputs are still stored under their own step IDs.
Dashboard UI:
- Parallel node type in the workflow editor.
- Config supports child step multi-select, completion strategy, required count (
n_of_m), and max concurrency. - DAG details panel shows strategy, progress (
N/M), and child status list.
YAML examples:
steps:
parallel_all:
type: parallel
max_parallel: 3
input:
strategy: all
steps: [check_a, check_b, check_c]
parallel_any:
type: parallel
input:
strategy: any
steps: [provider_a, provider_b, provider_c]
parallel_threshold:
type: parallel
max_parallel: 2
input:
strategy: n_of_m
required: 2
steps: [scan_a, scan_b, scan_c]
5.3 loop
Purpose:
- Iterative execution of a body step with safety guards and expression-based stop conditions.
Key config fields:
input.body_step(recommended body step ID;input.bodyalias supported)input.max_iterations(default100; hard safety cap)input.condition(whilebehavior, continue while truthy;input.whilealias supported)input.until(stop when truthy)
Execution behavior:
- Parent loop step enters
runningand manages child iterations as virtual step IDs:loop_step_id[0],loop_step_id[1], ...
- Before each iteration, scope includes:
loop.indexloop.iterationloop.previous_output
- Child job env also includes:
loop_indexloop_iterationloop_previous_output
- Stop rules:
- If
conditionis set and becomes false, loop completes successfully. - If
untilis set and becomes true, loop completes successfully. - If neither is set, loop runs exactly
max_iterations. - If
condition/untilstill require continuation atmax_iterations, loop fails with a max-iteration error.
- If
- Loop body step definitions referenced by
body_stepare orchestrated by loop parent and excluded from top-level scheduling. - Timeline events:
step_loop_iterationstep_loop_completed
Output format:
- Parent step output object:
iterationslast_output
Dashboard UI:
- Loop node type is available in workflow builder.
- Loop config includes body step, max iterations,
condition, anduntil. - DAG node details include loop progress and expandable per-iteration child status.
YAML examples:
steps:
retry_exactly_three_times:
type: loop
input:
body_step: run_scan
max_iterations: 3
steps:
retry_until_clean:
type: loop
input:
body_step: run_scan
max_iterations: 1000
until: "steps.run_scan.output.clean == true"
steps:
while_budget_available:
type: loop
input:
body_step: process_chunk
max_iterations: 20
condition: "loop.index < 5"
5.4 transform
Purpose:
- Inline data shaping/mapping between steps — no job dispatch, executes instantly in the engine.
Key config fields:
input— map of key/value pairs where each value is an expression using${...}template syntaxoutput_path— optional dot-separated path inrun.Contextto write the result
Current behavior:
- Dedicated inline handler (no job dispatch).
- Evaluates ALL keys in
step.Inputas template expressions viaevalTemplates(). - Each key/value pair in
inputbecomes a key/value in the output map. - Validates output schema if
OutputSchemaorOutputSchemaIDis defined. - Writes result to
OutputPathin run context viarecordStepInlineOutput(). - Marks step as
succeededimmediately. - If any expression evaluation fails, step is marked
failedwith error message including the expression error. - If input is
nil(no mappings), succeeds with empty output map{}. - Emits timeline event
step_transform_completedorstep_transform_failed.
Output format:
- Inline output map stored in
run.Context.steps[step_id].output. - If
output_pathis set, also written torun.Contextat the specified path.
Dashboard UI:
Codeicon in STEP_ICONS.TransformDetailpanel in NodeDetailPanel shows:- Status badge and duration.
- Output path display.
- Key/value table showing input expressions alongside their evaluated output values (when step is succeeded).
- Error card if step failed.
- Collapsible full output JSON.
YAML example:
steps:
reshape_payload:
type: transform
depends_on: [fetch_data]
input:
case_id: "${input.ticket_id}"
summary: "${steps.fetch_data.output.title}"
item_count: "${steps.fetch_data.output.items}"
output_path: ctx.transformed
Expression syntax notes:
- Use
${expr}for template expressions (NOT{{ }}). - A value with a single
${...}expression preserves the original type (number, boolean, map). - A value with mixed text and
${...}is stringified:"Hello ${input.name}"→"Hello world". - Nested maps in
inputare recursively evaluated. - Missing references resolve to
nil(no error) — use explicit validation if strict references are needed.
5.5 storage
Purpose:
- Inline read/write/delete operations on the workflow run context. No job dispatch — executes instantly in the engine.
Key config fields:
input.operation(read|write|delete)input.key(dot-separated path intorun.Context, e.g.data.user.name)input.value(required forwrite, supports${expr}templates)output_path(optional; write step output to run context)
Execution behavior:
- Dedicated inline handler (no job dispatch).
- Evaluates ALL fields in
step.Inputas template expressions viaevalTemplates(). - Operations:
read: Looks upkeyinrun.Contextusing dot-path navigation. Fails if key not found.write: Setskeyinrun.Contexttovalue(creating intermediate maps as needed). Value supports${expr}templates resolved against run scope.delete: Removeskeyfromrun.Contextusing dot-path navigation. Silent no-op if path doesn't exist.
- Validates output schema if
OutputSchemaorOutputSchemaIDis defined. - Writes result to
OutputPathin run context viarecordStepInlineOutput(). - Marks step as
succeededimmediately. - If expression evaluation fails, step is marked
failed. - Emits timeline events
step_storage_completedorstep_storage_failed.
Output format:
read:{ "operation": "read", "key": "<key>", "value": <resolved_value> }write:{ "operation": "write", "key": "<key>", "value": <written_value> }delete:{ "operation": "delete", "key": "<key>" }- Inline output stored in
run.Context.steps[step_id].output.
Dashboard UI:
Databaseicon in STEP_ICONS and DAG overlay nodes.StorageDetailpanel in NodeDetailPanel shows:- Status badge and duration.
- Operation badge (read=info, write=success, delete=danger).
- Target badge (context).
- Key path display.
- Value display (for read/write results when step succeeded).
- Output path display.
- Error card if step failed.
- Collapsible full output JSON.
- Workflow editor config: operation selector, key path input with dot-notation hint, value expression textarea (write only), output path input (read only).
When to use Storage vs Transform:
- Storage = persistent read/write/delete of individual values in run context by key path.
- Transform = reshaping/mapping multiple values from step outputs into new structure.
YAML examples:
steps:
# Write a value to run context
save_greeting:
type: storage
input:
operation: write
key: "data.greeting"
value: "Hello ${input.name}"
# Read a value back
read_greeting:
type: storage
depends_on: [save_greeting]
input:
operation: read
key: "data.greeting"
output_path: ctx.greeting_result
# Delete a temporary value
cleanup_temp:
type: storage
depends_on: [read_greeting]
input:
operation: delete
key: "data.greeting"
Nested key path example:
steps:
set_theme:
type: storage
input:
operation: write
key: "user.preferences.theme"
value: "dark"
Expression syntax notes:
- Use
${expr}for template expressions invaluefield. - Key paths use dot-separated segments:
a.b.cnavigates into nested maps. - Missing intermediate keys in
writeare created automatically. readon a missing key fails the step with an error message.
5.6 subworkflow
Purpose:
- Trigger a child workflow run, wait for completion, and propagate mapped output back to parent step.
Key config fields:
input.workflow_id(required)input.input_mapping(optional; evaluated template/object for child run input)input.output_mapping(optional; evaluated when child completes, with child scope available)output_path(optional parent context write path)
Execution behavior:
- On first evaluation:
- Validates target workflow exists.
- Evaluates
input_mappinginto child run input (defaults to parent run input when omitted). - Creates child run in same store.
- Inherits parent org/team/dry-run context.
- Stores
parent_run_id,parent_step_id, andcall_stackmetadata on child run. - Starts child run and marks parent step
running.
- On re-evaluation:
- Reads child run by tracked child run ID.
- If child is still
pending/running/waiting, parent remainsrunning. - If child
succeeded, evaluatesoutput_mapping(or emits default child summary output), validates output, and marks parentsucceeded. - If child
failed/cancelled/timed_out, parent propagates terminal error and marks failed/timed_out/cancelled accordingly.
- Circular reference guard:
- Uses
call_stackmetadata to detect cycles before creating nested child run. - Fails fast with
circular workflow reference detectedwhen cycle is found. call_stackmetadata also provides the basis for operational nested-depth limits.
- Uses
- Timeline events:
step_subworkflow_startedstep_subworkflow_completed
Output format:
- Default parent output includes:
child_run_idchild_workflow_idchild_status- child output summary (or mapped fields when
output_mappingis provided)
Dashboard UI:
- Sub-workflow node type in workflow builder.
- Config supports workflow selector + input/output mapping + output path.
- DAG details panel shows child workflow/run links, child status badge, and mapping payloads.
YAML example:
steps:
invoke_child:
type: subworkflow
input:
workflow_id: wf-remediation
input_mapping:
ticket_id: "${input.ticket_id}"
output_mapping:
remediation_status: "${child.status}"
remediation_result_ptr: "${child.steps.remediate.result_ptr}"
output_path: ctx.remediation
6. Worker-Dispatch Step Types (Generic Dispatch Today)
These are modeled constants and currently execute through generic job dispatch in scheduleReady().
6.1 llm
Purpose:
- LLM completion/inference task.
Key config fields:
topic(for examplejob.llm.generate)input.promptinput.budget(token limits),meta.capability
Execution behavior:
- Generic job dispatch fallback.
Output format:
- Job result pointer + optional inlined payload in
run.Context.steps[step_id].
Dashboard UI:
- Usually normalized to
agent-taskortool-call.
YAML example:
steps:
summarize:
type: llm
topic: job.llm.generate
input:
prompt: "Summarize: ${input.message}"
6.2 http
Purpose:
- HTTP call step executed by worker.
Key config fields:
topic(for examplejob.http.request)input.methodinput.urlinput.headers/input.body
Execution behavior:
- Generic job dispatch fallback.
Output format:
- Job result pointer + optional inlined payload in
run.Context.steps[step_id].
Dashboard UI:
httpicon in run overlay.
YAML example:
steps:
fetch_profile:
type: http
topic: job.http.request
input:
method: GET
url: "https://api.example.com/users/${input.user_id}"
6.3 container
Purpose:
- Containerized execution step.
Key config fields:
topic(for examplejob.container.run)input.imageinput.args/input.env
Execution behavior:
- Generic job dispatch fallback.
Output format:
- Job result pointer + optional inlined payload in
run.Context.steps[step_id].
Dashboard UI:
- Rendered as generic job/agent-task style.
YAML example:
steps:
run_scanner:
type: container
topic: job.container.run
input:
image: ghcr.io/acme/scanner:latest
args: ["--target", "${input.repo}"]
6.4 script
Purpose:
- Script execution step.
Key config fields:
topic(for examplejob.script.run)input.languageinput.source
Execution behavior:
- Generic job dispatch fallback.
Output format:
- Job result pointer + optional inlined payload in
run.Context.steps[step_id].
Dashboard UI:
- Rendered as generic job/agent-task style.
YAML example:
steps:
sanitize_text:
type: script
topic: job.script.run
input:
language: python
source: "print('ok')"
6.5 input
Purpose:
- User/input collection step via worker.
Key config fields:
topic(for examplejob.input.collect)input.form_idinput.schema
Execution behavior:
- Generic job dispatch fallback.
Output format:
- Job result pointer + optional inlined payload in
run.Context.steps[step_id].
Dashboard UI:
- Rendered as generic job/agent-task style.
YAML example:
steps:
collect_feedback:
type: input
topic: job.input.collect
input:
form_id: feedback-v1
7. Common Step Fields and Semantics
Fields from Step model that apply across types:
depends_on: step dependencies (all must succeed)condition: pre-gate for non-conditionstepsfor_each: fan-out expression, must evaluate to arraymax_parallel: throttle concurrent children forfor_eachandparallelinput: payload with template support via${expr}input_schema/input_schema_id: input validationoutput_schema/output_schema_id: output validationoutput_path: write output intorun.Contextpathtimeout_sec: job budget deadline (ms in request budget)retry: retry policy (max_retries,initial_backoff_sec,max_backoff_sec,multiplier)on_error: modeled jump target field (documented, minimal runtime handling today)route_labels: propagated to job labels for pool/worker routingmeta: job metadata overrides (actor_id,actor_type,idempotency_key,pack_id,capability,risk_tags,requires,labels)
output_path behavior:
- Job-backed steps: stores inline result (if small and decodable) or pointer string.
- Inline steps (for example,
condition): stores computed value directly.
8. Dashboard Mapping Notes
The dashboard normalizes backend step types in dashboard/src/api/transform.ts:
jobbackend steps becomeagent-task,pack-action, ortool-callbased on metadata- step with
for_eachis rendered asfan-out - unknown backend type is preserved in
config.backendTypeand shown asagent-task
DAG node and detail coverage:
- Dedicated detail panels:
job/agent-task,approval,condition,delay,fan-out,parallel,switch,loop,transform,storage,subworkflow - Generic detail panel:
notifyand other types - Icon mappings include
parallel,switch,loop,transform,storage,http,sub-workflow
9. Cross-References
- System Overview
- API Overview
- Workflow model:
core/workflow/models.go - Workflow engine:
core/workflow/engine.go - Expression evaluator:
core/workflow/eval.go - Dashboard DAG detail:
dashboard/src/components/workflows/dag/NodeDetailPanel.tsx - Dashboard run overlay:
dashboard/src/components/workflows/dag/RunOverlayNode.tsx