Skip to main content

Workflow patterns

Common durable workflow patterns supported by Catalyst Workflows (built on Dapr Workflow):

Task chaining

Execute a sequence of activities in order, passing output from one step to the next. If any step fails, the workflow retries it from that step — earlier steps are not re-executed. Combine with a try/except around the whole chain to route failures through a compensation activity.

@wfr.workflow(name="task_chain")
def task_chain_workflow(ctx: DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
result3 = yield ctx.call_activity(step3, input=result2)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2, result3]

Fan-out / Fan-in

Spawn multiple activity instances in parallel, then aggregate their results once all complete. Catalyst Workflows tracks each branch independently and resumes the aggregation step only after all branches succeed. The parallelism is bounded by the activity worker pool — there is no need to manage concurrency limits in workflow code.

from dapr.ext.workflow import DaprWorkflowContext, when_all

@wfr.workflow(name="batch_processing")
def batch_processing_workflow(ctx: DaprWorkflowContext, wf_input: int):
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
parallel_tasks = [
ctx.call_activity(process_work_item, input=item) for item in work_batch
]
outputs = yield when_all(parallel_tasks)
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)

Async HTTP (polling consumer)

This is an architectural pattern, not a workflow code pattern. The HTTP layer (your web framework) starts the workflow, returns HTTP 202 Accepted with an instance ID immediately, and exposes a GET /status/{id} endpoint that queries workflow status. The workflow runs durably in the background regardless of how long the task takes — pods can recycle, deploys can roll, and the eventual GET will see the final result.

Use diagrid workflow get or the workflow APIs from your status endpoint to look up instance state.

Monitor

A loop that wakes periodically using a durable timer, checks an external condition, and sends a notification or raises an event when the condition is met. The timer survives process restarts. The example below polls a job-status endpoint every 5 minutes until it completes, then notifies the caller.

@wfr.workflow(name="job_status_monitor")
def job_status_monitor(ctx: DaprWorkflowContext, job_id: str):
poll_interval = timedelta(minutes=5)
while True:
status = yield ctx.call_activity(check_job_status, input=job_id)
if status == "completed":
yield ctx.call_activity(notify_complete, input=job_id)
return
if status == "failed":
yield ctx.call_activity(notify_failure, input=job_id)
return
yield ctx.create_timer(poll_interval)

For monitors that run forever (e.g., a long-lived watcher), use continue_as_new to truncate workflow history periodically — see Durable execution.

External events / Human-in-the-loop

Pause execution at a step that waits for an external event — an approval, a webhook callback, or a manual signal. The workflow remains durable while waiting; you can raise the event from the CLI or the console.

The example below shows a request-escalation workflow: orders under $1,000 are auto-approved; larger orders wait for a human approval event with a 7-day timeout.

@wfr.workflow(name="request_escalation")
def request_escalation_workflow(ctx: DaprWorkflowContext, order_str: str):
order = json.loads(order_str) if isinstance(order_str, str) else order_str
amount = order.get("amount", 0)

# Auto-approve orders under $1000
if amount < 1000:
yield ctx.call_activity(auto_approve_activity, input=order)
return

# Orders $1000+ require human approval (7 days max)
approval_event = ctx.wait_for_external_event("human_approval")
timeout = ctx.create_timer(timedelta(days=7))
winner = yield when_any([approval_event, timeout])

if winner == timeout:
yield ctx.call_activity(handle_timeout_activity, input=order)
return

approval_result = yield approval_event
yield ctx.call_activity(process_approval_activity, input=approval_result)

Raise the event from the CLI once the approval is ready:

diagrid workflow raise-event --app-id my-workflow-app \
--instance-id <id> --event-name human_approval --event-data '{"approved": true}'

Compensation (Saga)

When a multi-step workflow fails partway through, run compensating activities to undo the steps that already committed. This is the Saga pattern — explicit, code-driven rollback instead of distributed transactions. Track completed steps in workflow-local state and call compensation activities in reverse order on failure.

@wfr.workflow(name="book_trip")
def book_trip_workflow(ctx: DaprWorkflowContext, trip: dict):
completed = []
try:
flight = yield ctx.call_activity(book_flight, input=trip)
completed.append(("flight", flight))

hotel = yield ctx.call_activity(book_hotel, input=trip)
completed.append(("hotel", hotel))

car = yield ctx.call_activity(book_car, input=trip)
completed.append(("car", car))

return {"flight": flight, "hotel": hotel, "car": car}
except Exception as e:
# Roll back completed steps in reverse order.
for kind, booking in reversed(completed):
if kind == "flight":
yield ctx.call_activity(cancel_flight, input=booking)
elif kind == "hotel":
yield ctx.call_activity(cancel_hotel, input=booking)
elif kind == "car":
yield ctx.call_activity(cancel_car, input=booking)
raise

Compensation activities must be idempotent — they may be retried after a crash. See Durable execution: activities for why.

See also