Workflow patterns
Common durable workflow patterns supported by Catalyst Workflows (built on Dapr Workflow):
Task Chaining

Multiple steps run in succession, with the output of one step passed as input to the next. Ideal for sequential data processing operations like filtering, transforming, and reducing.
Read more →Fan-out / Fan-in

Execute multiple tasks simultaneously across potentially multiple workers, wait for them to finish, and aggregate the results. Perfect for parallel processing with dynamic task counts.
Read more →Async HTTP APIs

Return an immediate HTTP 202, then expose a status endpoint the caller can poll. No custom state management required for long-running operations.
Read more →Monitor

A recurring loop that wakes on a durable timer, checks an external condition, takes action, and repeats. Supports dynamic sleep intervals and graceful termination.
Read more →External System Interaction

Pause and wait for external systems or human input. Supports timeouts, event-driven resumption, and compensation logic for approval workflows.
Read more →Compensation

Roll back or undo operations that have already executed when a workflow fails partway through. Essential for consistency in distributed transactions.
Read more →Task chaining
Execute a sequence of activities in order, passing output from one step to the next. If any step fails, the workflow retries it from that step — earlier steps are not re-executed. Combine with a try/except around the whole chain to route failures through a compensation activity.
- Python
- JavaScript
- .NET
- Java
- Go
@wfr.workflow(name="task_chain")
def task_chain_workflow(ctx: DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
result3 = yield ctx.call_activity(step3, input=result2)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2, result3]
const taskChainWorkflow: TWorkflow = async function* (
ctx: WorkflowContext,
wfInput: number
): any {
try {
const result1 = yield ctx.callActivity(step1, wfInput);
const result2 = yield ctx.callActivity(step2, result1);
const result3 = yield ctx.callActivity(step3, result2);
return [result1, result2, result3];
} catch (err) {
yield ctx.callActivity(errorHandler, String(err));
throw err;
}
};
public class TaskChainWorkflow : Workflow<int, int[]>
{
public override async Task<int[]> RunAsync(WorkflowContext context, int wfInput)
{
try
{
var result1 = await context.CallActivityAsync<int>(nameof(Step1), wfInput);
var result2 = await context.CallActivityAsync<int>(nameof(Step2), result1);
var result3 = await context.CallActivityAsync<int>(nameof(Step3), result2);
return new[] { result1, result2, result3 };
}
catch (Exception ex)
{
await context.CallActivityAsync(nameof(ErrorHandler), ex.Message);
throw;
}
}
}
public class TaskChainWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
int wfInput = ctx.getInput(Integer.class);
try {
int result1 = ctx.callActivity("Step1", wfInput, int.class).await();
int result2 = ctx.callActivity("Step2", result1, int.class).await();
int result3 = ctx.callActivity("Step3", result2, int.class).await();
ctx.complete(new int[] { result1, result2, result3 });
} catch (TaskFailedException e) {
ctx.callActivity("ErrorHandler", e.getMessage()).await();
throw e;
}
};
}
}
func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var wfInput int
if err := ctx.GetInput(&wfInput); err != nil {
return nil, err
}
handleErr := func(err error) error {
_ = ctx.CallActivity(ErrorHandler, workflow.WithActivityInput(err.Error())).Await(nil)
return err
}
var result1, result2, result3 int
if err := ctx.CallActivity(Step1, workflow.WithActivityInput(wfInput)).Await(&result1); err != nil {
return nil, handleErr(err)
}
if err := ctx.CallActivity(Step2, workflow.WithActivityInput(result1)).Await(&result2); err != nil {
return nil, handleErr(err)
}
if err := ctx.CallActivity(Step3, workflow.WithActivityInput(result2)).Await(&result3); err != nil {
return nil, handleErr(err)
}
return []int{result1, result2, result3}, nil
}
Fan-out / Fan-in
Spawn multiple activity instances in parallel, then aggregate their results once all complete. Catalyst Workflows tracks each branch independently and resumes the aggregation step only after all branches succeed. The parallelism is bounded by the activity worker pool — there is no need to manage concurrency limits in workflow code.
- Python
- JavaScript
- .NET
- Java
- Go
from dapr.ext.workflow import DaprWorkflowContext, when_all
@wfr.workflow(name="batch_processing")
def batch_processing_workflow(ctx: DaprWorkflowContext, wf_input: int):
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
parallel_tasks = [
ctx.call_activity(process_work_item, input=item) for item in work_batch
]
outputs = yield when_all(parallel_tasks)
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)
const batchProcessingWorkflow: TWorkflow = async function* (
ctx: WorkflowContext,
wfInput: number
): any {
const workBatch = yield ctx.callActivity(getWorkItemsActivity, wfInput);
const tasks = workBatch.map((item: number) =>
ctx.callActivity(processWorkItemActivity, item)
);
const outputs: number[] = yield ctx.whenAll(tasks);
const total = outputs.reduce((sum, v) => sum + v, 0);
yield ctx.callActivity(processResultsActivity, total);
};
public class BatchProcessingWorkflow : Workflow<int, object>
{
public override async Task<object> RunAsync(WorkflowContext context, int wfInput)
{
var workBatch = await context.CallActivityAsync<int[]>(nameof(GetWorkBatch), wfInput);
var parallelTasks = workBatch
.Select(item => context.CallActivityAsync<int>(nameof(ProcessWorkItem), item))
.ToList();
await Task.WhenAll(parallelTasks);
var total = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync(nameof(ProcessResults), total);
return null;
}
}
public class BatchProcessingWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
int wfInput = ctx.getInput(Integer.class);
Integer[] workBatch = ctx.callActivity("GetWorkBatch", wfInput, Integer[].class).await();
List<Task<Integer>> tasks = Arrays.stream(workBatch)
.map(item -> ctx.callActivity("ProcessWorkItem", item, Integer.class))
.collect(Collectors.toList());
List<Integer> outputs = ctx.allOf(tasks).await();
int total = outputs.stream().mapToInt(Integer::intValue).sum();
ctx.callActivity("ProcessResults", total).await();
ctx.complete(total);
};
}
}
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var wfInput int
if err := ctx.GetInput(&wfInput); err != nil {
return nil, err
}
var workBatch []int
if err := ctx.CallActivity(GetWorkBatch, workflow.WithActivityInput(wfInput)).Await(&workBatch); err != nil {
return nil, err
}
parallelTasks := make([]workflow.Task, len(workBatch))
for i, item := range workBatch {
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.WithActivityInput(item))
}
total := 0
for _, task := range parallelTasks {
var output int
if err := task.Await(&output); err != nil {
return nil, err
}
total += output
}
return nil, ctx.CallActivity(ProcessResults, workflow.WithActivityInput(total)).Await(nil)
}
Async HTTP (polling consumer)
This is an architectural pattern, not a workflow code pattern. The HTTP layer (your web framework) starts the workflow, returns HTTP 202 Accepted with an instance ID immediately, and exposes a GET /status/{id} endpoint that queries workflow status. The workflow runs durably in the background regardless of how long the task takes — pods can recycle, deploys can roll, and the eventual GET will see the final result.
Use diagrid workflow get or the workflow APIs from your status endpoint to look up instance state.
Monitor
A loop that wakes periodically using a durable timer, checks an external condition, and sends a notification or raises an event when the condition is met. The timer survives process restarts. The example below polls a job-status endpoint every 5 minutes until it completes, then notifies the caller.
- Python
- JavaScript
- .NET
- Java
- Go
@wfr.workflow(name="job_status_monitor")
def job_status_monitor(ctx: DaprWorkflowContext, job_id: str):
poll_interval = timedelta(minutes=5)
while True:
status = yield ctx.call_activity(check_job_status, input=job_id)
if status == "completed":
yield ctx.call_activity(notify_complete, input=job_id)
return
if status == "failed":
yield ctx.call_activity(notify_failure, input=job_id)
return
yield ctx.create_timer(poll_interval)
const jobStatusMonitor: TWorkflow = async function* (
ctx: WorkflowContext,
jobId: string
): any {
const pollInterval = 5 * 60; // seconds
while (true) {
const status = yield ctx.callActivity(checkJobStatus, jobId);
if (status === "completed") {
yield ctx.callActivity(notifyComplete, jobId);
return;
}
if (status === "failed") {
yield ctx.callActivity(notifyFailure, jobId);
return;
}
yield ctx.createTimer(pollInterval);
}
};
public class JobStatusMonitor : Workflow<string, object>
{
public override async Task<object> RunAsync(WorkflowContext context, string jobId)
{
var pollInterval = TimeSpan.FromMinutes(5);
while (true)
{
var status = await context.CallActivityAsync<string>(nameof(CheckJobStatus), jobId);
if (status == "completed")
{
await context.CallActivityAsync(nameof(NotifyComplete), jobId);
return null;
}
if (status == "failed")
{
await context.CallActivityAsync(nameof(NotifyFailure), jobId);
return null;
}
await context.CreateTimer(pollInterval);
}
}
}
public class JobStatusMonitor implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
String jobId = ctx.getInput(String.class);
Duration pollInterval = Duration.ofMinutes(5);
while (true) {
String status = ctx.callActivity("CheckJobStatus", jobId, String.class).await();
if ("completed".equals(status)) {
ctx.callActivity("NotifyComplete", jobId).await();
ctx.complete("completed");
return;
}
if ("failed".equals(status)) {
ctx.callActivity("NotifyFailure", jobId).await();
ctx.complete("failed");
return;
}
ctx.createTimer(pollInterval).await();
}
};
}
}
func JobStatusMonitor(ctx *workflow.WorkflowContext) (any, error) {
var jobID string
if err := ctx.GetInput(&jobID); err != nil {
return nil, err
}
pollInterval := 5 * time.Minute
for {
var status string
if err := ctx.CallActivity(CheckJobStatus, workflow.WithActivityInput(jobID)).Await(&status); err != nil {
return nil, err
}
if status == "completed" {
return nil, ctx.CallActivity(NotifyComplete, workflow.WithActivityInput(jobID)).Await(nil)
}
if status == "failed" {
return nil, ctx.CallActivity(NotifyFailure, workflow.WithActivityInput(jobID)).Await(nil)
}
if err := ctx.CreateTimer(pollInterval).Await(nil); err != nil {
return nil, err
}
}
}
For monitors that run forever (e.g., a long-lived watcher), use continue_as_new to truncate workflow history periodically — see Durable execution.
External events / Human-in-the-loop
Pause execution at a step that waits for an external event — an approval, a webhook callback, or a manual signal. The workflow remains durable while waiting; you can raise the event from the CLI or the console.
The example below shows a request-escalation workflow: orders under $1,000 are auto-approved; larger orders wait for a human approval event with a 7-day timeout.
- Python
- JavaScript
- .NET
- Java
- Go
@wfr.workflow(name="request_escalation")
def request_escalation_workflow(ctx: DaprWorkflowContext, order_str: str):
order = json.loads(order_str) if isinstance(order_str, str) else order_str
amount = order.get("amount", 0)
# Auto-approve orders under $1000
if amount < 1000:
yield ctx.call_activity(auto_approve_activity, input=order)
return
# Orders $1000+ require human approval (7 days max)
approval_event = ctx.wait_for_external_event("human_approval")
timeout = ctx.create_timer(timedelta(days=7))
winner = yield when_any([approval_event, timeout])
if winner == timeout:
yield ctx.call_activity(handle_timeout_activity, input=order)
return
approval_result = yield approval_event
yield ctx.call_activity(process_approval_activity, input=approval_result)
const requestEscalationWorkflow: TWorkflow = async function* (
ctx: WorkflowContext,
order: OrderRequest
): any {
// Auto-approve orders under $1000
if (order.amount < 1000) {
yield ctx.callActivity(autoApproveActivity, order);
return;
}
// Orders $1000+ require human approval (7 days max)
const approvalEvent = ctx.waitForExternalEvent("human_approval");
const timeout = ctx.createTimer(7 * 24 * 60 * 60);
const winner = yield ctx.whenAny([approvalEvent, timeout]);
if (winner == timeout) {
yield ctx.callActivity(handleTimeoutActivity, order);
return;
}
const approvalResult = approvalEvent.getResult();
yield ctx.callActivity(processApprovalActivity, approvalResult);
};
public class RequestEscalationWorkflow : Workflow<OrderRequest, object>
{
public override async Task<object> RunAsync(WorkflowContext context, OrderRequest order)
{
// Auto-approve orders under $1000
if (order.Amount < 1000)
{
await context.CallActivityAsync(nameof(AutoApproveActivity), order);
return null;
}
// Orders $1000+ require human approval (7 days max)
try
{
var approval = await context.WaitForExternalEventAsync<ApprovalResult>(
eventName: "human_approval",
timeout: TimeSpan.FromDays(7));
await context.CallActivityAsync(nameof(ProcessApprovalActivity), approval);
return null;
}
catch (TaskCanceledException)
{
await context.CallActivityAsync(nameof(HandleTimeoutActivity), order);
return null;
}
}
}
public class RequestEscalationWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
OrderRequest order = ctx.getInput(OrderRequest.class);
// Auto-approve orders under $1000
if (order.amount < 1000) {
ctx.callActivity("AutoApproveActivity", order).await();
ctx.complete("approved");
return;
}
// Orders $1000+ require human approval (7 days max)
try {
ApprovalResult approval = ctx.waitForExternalEvent(
"human_approval", Duration.ofDays(7), ApprovalResult.class).await();
ctx.callActivity("ProcessApprovalActivity", approval).await();
} catch (TaskCanceledException e) {
ctx.callActivity("HandleTimeoutActivity", order).await();
}
ctx.complete("order completed");
};
}
}
func RequestEscalationWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var order OrderRequest
if err := ctx.GetInput(&order); err != nil {
return nil, err
}
// Auto-approve orders under $1000
if order.Amount < 1000 {
return nil, ctx.CallActivity(AutoApproveActivity, workflow.WithActivityInput(order)).Await(nil)
}
// Orders $1000+ require human approval (7 days max)
var approval ApprovalResult
err := ctx.WaitForExternalEvent("human_approval", 7*24*time.Hour).Await(&approval)
if err == nil {
return nil, ctx.CallActivity(ProcessApprovalActivity, workflow.WithActivityInput(approval)).Await(nil)
}
return nil, ctx.CallActivity(HandleTimeoutActivity, workflow.WithActivityInput(order)).Await(nil)
}
Raise the event from the CLI once the approval is ready:
diagrid workflow raise-event --app-id my-workflow-app \
--instance-id <id> --event-name human_approval --event-data '{"approved": true}'
Compensation (Saga)
When a multi-step workflow fails partway through, run compensating activities to undo the steps that already committed. This is the Saga pattern — explicit, code-driven rollback instead of distributed transactions. Track completed steps in workflow-local state and call compensation activities in reverse order on failure.
- Python
- JavaScript
- .NET
- Java
- Go
@wfr.workflow(name="book_trip")
def book_trip_workflow(ctx: DaprWorkflowContext, trip: dict):
completed = []
try:
flight = yield ctx.call_activity(book_flight, input=trip)
completed.append(("flight", flight))
hotel = yield ctx.call_activity(book_hotel, input=trip)
completed.append(("hotel", hotel))
car = yield ctx.call_activity(book_car, input=trip)
completed.append(("car", car))
return {"flight": flight, "hotel": hotel, "car": car}
except Exception as e:
# Roll back completed steps in reverse order.
for kind, booking in reversed(completed):
if kind == "flight":
yield ctx.call_activity(cancel_flight, input=booking)
elif kind == "hotel":
yield ctx.call_activity(cancel_hotel, input=booking)
elif kind == "car":
yield ctx.call_activity(cancel_car, input=booking)
raise
const bookTripWorkflow: TWorkflow = async function* (
ctx: WorkflowContext,
trip: Trip
): any {
const completed: Array<[string, any]> = [];
try {
const flight = yield ctx.callActivity(bookFlight, trip);
completed.push(["flight", flight]);
const hotel = yield ctx.callActivity(bookHotel, trip);
completed.push(["hotel", hotel]);
const car = yield ctx.callActivity(bookCar, trip);
completed.push(["car", car]);
return { flight, hotel, car };
} catch (err) {
// Roll back completed steps in reverse order.
for (const [kind, booking] of [...completed].reverse()) {
if (kind === "flight") yield ctx.callActivity(cancelFlight, booking);
else if (kind === "hotel") yield ctx.callActivity(cancelHotel, booking);
else if (kind === "car") yield ctx.callActivity(cancelCar, booking);
}
throw err;
}
};
public class BookTripWorkflow : Workflow<Trip, object>
{
public override async Task<object> RunAsync(WorkflowContext context, Trip trip)
{
var completed = new List<(string Kind, object Booking)>();
try
{
var flight = await context.CallActivityAsync<object>(nameof(BookFlight), trip);
completed.Add(("flight", flight));
var hotel = await context.CallActivityAsync<object>(nameof(BookHotel), trip);
completed.Add(("hotel", hotel));
var car = await context.CallActivityAsync<object>(nameof(BookCar), trip);
completed.Add(("car", car));
return new { flight, hotel, car };
}
catch (Exception)
{
// Roll back completed steps in reverse order.
completed.Reverse();
foreach (var (kind, booking) in completed)
{
switch (kind)
{
case "flight":
await context.CallActivityAsync(nameof(CancelFlight), booking);
break;
case "hotel":
await context.CallActivityAsync(nameof(CancelHotel), booking);
break;
case "car":
await context.CallActivityAsync(nameof(CancelCar), booking);
break;
}
}
throw;
}
}
}
public class BookTripWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
Trip trip = ctx.getInput(Trip.class);
List<Map.Entry<String, Object>> completed = new ArrayList<>();
try {
Object flight = ctx.callActivity("BookFlight", trip, Object.class).await();
completed.add(Map.entry("flight", flight));
Object hotel = ctx.callActivity("BookHotel", trip, Object.class).await();
completed.add(Map.entry("hotel", hotel));
Object car = ctx.callActivity("BookCar", trip, Object.class).await();
completed.add(Map.entry("car", car));
ctx.complete(Map.of("flight", flight, "hotel", hotel, "car", car));
} catch (TaskFailedException e) {
// Roll back completed steps in reverse order.
Collections.reverse(completed);
for (Map.Entry<String, Object> step : completed) {
switch (step.getKey()) {
case "flight":
ctx.callActivity("CancelFlight", step.getValue()).await();
break;
case "hotel":
ctx.callActivity("CancelHotel", step.getValue()).await();
break;
case "car":
ctx.callActivity("CancelCar", step.getValue()).await();
break;
}
}
throw e;
}
};
}
}
func BookTripWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var trip Trip
if err := ctx.GetInput(&trip); err != nil {
return nil, err
}
type step struct {
kind string
booking any
}
var completed []step
rollback := func() {
// Roll back completed steps in reverse order.
for i := len(completed) - 1; i >= 0; i-- {
s := completed[i]
switch s.kind {
case "flight":
_ = ctx.CallActivity(CancelFlight, workflow.WithActivityInput(s.booking)).Await(nil)
case "hotel":
_ = ctx.CallActivity(CancelHotel, workflow.WithActivityInput(s.booking)).Await(nil)
case "car":
_ = ctx.CallActivity(CancelCar, workflow.WithActivityInput(s.booking)).Await(nil)
}
}
}
var flight any
if err := ctx.CallActivity(BookFlight, workflow.WithActivityInput(trip)).Await(&flight); err != nil {
rollback()
return nil, err
}
completed = append(completed, step{"flight", flight})
var hotel any
if err := ctx.CallActivity(BookHotel, workflow.WithActivityInput(trip)).Await(&hotel); err != nil {
rollback()
return nil, err
}
completed = append(completed, step{"hotel", hotel})
var car any
if err := ctx.CallActivity(BookCar, workflow.WithActivityInput(trip)).Await(&car); err != nil {
rollback()
return nil, err
}
completed = append(completed, step{"car", car})
return map[string]any{"flight": flight, "hotel": hotel, "car": car}, nil
}
Compensation activities must be idempotent — they may be retried after a crash. See Durable execution: activities for why.
See also
- Durable execution — replay semantics and determinism rules
- Workflow versioning — evolving workflow code safely
- AI agent patterns — applying these patterns to LLM-driven agents
- Develop workflows — language-specific SDK guides