Quickstart: LangGraph Durable Workflows
This quickstart is coming soon. Stay tuned!
Run LangGraph graphs with durable execution using Dapr Workflows. Each node execution becomes a checkpoint — if your graph crashes mid-execution, it resumes exactly where it left off.
What You'll Build
A LangGraph agent that:
- Survives crashes — Workflow state is checkpointed after each node execution
- Supports parallel execution — Multiple triggered nodes run concurrently as activities
- Resumes from checkpoint — Restart the app and pick up where you left off
Why Durable Graph Execution?
Complex LangGraph workflows with multiple nodes, conditional edges, and tool calls are vulnerable to failures. Dapr Workflows make your graphs production-ready:
- Automatic checkpointing — Every node execution is persisted
- Crash recovery — On restart, the graph replays from the last checkpoint
- Parallel node execution — Nodes triggered simultaneously run as parallel activities
- Observability — Track graph execution state and debug failures
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Dapr Workflow Runtime │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ LangGraph Workflow (Pregel) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Reasoning │────▶│ Tools │────▶│ Answer │ │ │
│ │ │ (Node) │ │ (Node) │ │ (Node) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ [Checkpoint] [Checkpoint] [Checkpoint] │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Each node runs as a workflow activity. If the process crashes after any checkpoint, the workflow resumes from that point — previous nodes won't re-execute.
Prerequisites
- Dapr CLI installed and initialized (
dapr init) - Python 3.11+
Setup
Create Project Structure
Create a new project directory:
mkdir langgraph-durable && cd langgraph-durable
mkdir -p components
Your project structure:
langgraph-durable/
├── components/
│ └── statestore.yaml
├── agent.py
└── requirements.txt
Configure Dapr State Store
Create components/statestore.yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
This uses Redis for workflow state persistence. Dapr initializes Redis automatically with dapr init.
Install Dependencies
Create requirements.txt:
dapr-ext-langgraph-workflow>=0.1.0
langgraph>=0.2.0
Set up a virtual environment and install:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
How It Works
The DaprWorkflowGraphRunner wraps your compiled LangGraph in a Dapr workflow:
- Graph configuration is extracted and stored in workflow state
- Each node execution runs as a workflow activity with checkpointing
- Conditional edges are evaluated as separate activities
- Channel state is serialized and persisted between steps
This means:
- Completed nodes won't re-execute after a crash
- The graph can resume mid-cycle from any node
- Parallel branches execute as concurrent activities
Build the Agent
Create the Durable Graph
Create agent.py:
import asyncio
from typing import Dict, List, Literal, Optional, TypedDict
from langgraph.graph import StateGraph, START, END
from dapr.ext.langgraph_workflow import DaprWorkflowGraphRunner
# Define the state schema
class AgentState(TypedDict):
question: str
thoughts: List[str]
tool_calls: List[Dict]
tool_results: List[str]
current_step: int
final_answer: Optional[str]
should_continue: bool
# Simulated tools
def search_tool(query: str) -> str:
"""Simulate a search tool."""
mock_results = {
"weather tokyo": "Tokyo: Sunny, 22°C",
"weather paris": "Paris: Cloudy, 18°C",
"population japan": "Japan population: 125 million",
}
query_lower = query.lower()
for key, value in mock_results.items():
if key in query_lower:
return value
return f"No results found for: {query}"
def calculator_tool(expression: str) -> str:
"""Simulate a calculator tool."""
try:
allowed_chars = set("0123456789+-*/(). ")
if not all(c in allowed_chars for c in expression):
return "Error: Invalid expression"
result = eval(expression)
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
# Define node functions
def reasoning_node(state: AgentState) -> dict:
"""Reason about the question and decide next action."""
print(f"[reasoning] Step {state['current_step']}: {state['question']}")
question = state["question"].lower()
step = state["current_step"]
tool_results = state.get("tool_results", [])
if step == 0:
# First step: analyze and use a tool
if "weather" in question:
thought = "User is asking about weather. Using search tool."
city = question.split()[-1] if question.split() else "tokyo"
tool_call = {"tool": "search", "args": f"weather {city}"}
elif any(op in question for op in ["+", "-", "*", "/"]):
thought = "User wants a calculation. Using calculator."
expr = "".join(c for c in question if c in "0123456789+-*/() ")
tool_call = {"tool": "calculator", "args": expr.strip()}
else:
thought = "Searching for information."
tool_call = {"tool": "search", "args": question}
return {
"thoughts": state["thoughts"] + [thought],
"tool_calls": state["tool_calls"] + [tool_call],
"current_step": step + 1,
"should_continue": True,
}
elif step == 1 and tool_results:
# Have results, ready to answer
thought = f"Got results: {tool_results[-1]}. Ready to answer."
return {
"thoughts": state["thoughts"] + [thought],
"current_step": step + 1,
"should_continue": False,
}
return {
"thoughts": state["thoughts"] + ["Max steps reached."],
"should_continue": False,
}
def tool_execution_node(state: AgentState) -> dict:
"""Execute the pending tool call."""
tool_calls = state.get("tool_calls", [])
if not tool_calls:
return {"tool_results": state.get("tool_results", [])}
tool_call = tool_calls[-1]
tool_name = tool_call.get("tool", "")
tool_args = tool_call.get("args", "")
print(f"[tools] Executing: {tool_name}({tool_args})")
if tool_name == "search":
result = search_tool(tool_args)
elif tool_name == "calculator":
result = calculator_tool(tool_args)
else:
result = f"Unknown tool: {tool_name}"
print(f"[tools] Result: {result}")
return {"tool_results": state.get("tool_results", []) + [result]}
def answer_node(state: AgentState) -> dict:
"""Generate the final answer."""
tool_results = state.get("tool_results", [])
if tool_results:
answer = f"Based on my research: {tool_results[-1]}"
else:
answer = "I couldn't find specific information for your question."
print(f"[answer] {answer}")
return {"final_answer": answer}
# Routing function
def should_continue(state: AgentState) -> Literal["tools", "answer"]:
"""Decide whether to use tools or generate answer."""
if state.get("should_continue", True):
return "tools"
return "answer"
def build_graph() -> StateGraph:
"""Build the ReAct-style agent graph."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("reasoning", reasoning_node)
graph.add_node("tools", tool_execution_node)
graph.add_node("answer", answer_node)
# Add edges
graph.add_edge(START, "reasoning")
graph.add_conditional_edges(
"reasoning",
should_continue,
{"tools": "tools", "answer": "answer"},
)
graph.add_edge("tools", "reasoning")
graph.add_edge("answer", END)
return graph
async def run_agent(runner: DaprWorkflowGraphRunner, question: str, thread_id: str):
"""Run the agent with a question."""
print(f"\n{'='*60}")
print(f"Question: {question}")
print(f"{'='*60}")
input_state = {
"question": question,
"thoughts": [],
"tool_calls": [],
"tool_results": [],
"current_step": 0,
"final_answer": None,
"should_continue": True,
}
async for event in runner.run_async(input=input_state, thread_id=thread_id):
event_type = event["type"]
if event_type == "workflow_started":
print(f"Workflow started: {event.get('workflow_id')}")
elif event_type == "workflow_completed":
output = event.get("output", {})
print(f"\nThinking process:")
for i, thought in enumerate(output.get("thoughts", []), 1):
print(f" {i}. {thought}")
print(f"\nFinal Answer: {output.get('final_answer', 'N/A')}")
elif event_type == "workflow_failed":
print(f"Workflow FAILED: {event.get('error')}")
async def main():
print("=" * 60)
print("LangGraph with Dapr Workflow - Durable Execution")
print("=" * 60)
# Build and compile the graph
graph = build_graph()
compiled = graph.compile()
# Create the Dapr Workflow runner
runner = DaprWorkflowGraphRunner(
graph=compiled,
max_steps=20,
name="react_agent",
)
# Start the workflow runtime
print("\nStarting Dapr Workflow runtime...")
runner.start()
try:
# Run example queries
await run_agent(runner, "What is the weather in Tokyo?", "thread-001")
await run_agent(runner, "Calculate 25 * 4 + 10", "thread-002")
print("\n" + "=" * 60)
print("All queries complete!")
print("=" * 60)
finally:
print("\nShutting down...")
runner.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Run the Agent
Start with Dapr
Run the graph with Dapr:
dapr run --app-id langgraph-agent \
--dapr-grpc-port 50001 \
--resources-path ./components \
-- python agent.py
You'll see:
- Dapr initializing the workflow runtime
- The graph executing nodes (each as a durable activity)
- The thinking process and final answers
Test Crash Recovery
To see durability in action:
- Start the agent
- While it's running, press
Ctrl+Cto simulate a crash - Restart with the same command
- The workflow resumes from the last completed node
Key Concepts
| Concept | Description |
|---|---|
| Workflow | A durable orchestration implementing LangGraph's Pregel execution model |
| Activity | A single node execution that can be retried |
| Checkpoint | Automatic state persistence after each node completes |
| Parallel Execution | Multiple triggered nodes run concurrently as separate activities |
Next Steps
- Add session management for persistent state
- Use pub/sub for multi-agent coordination
- Deploy to Kubernetes with Catalyst
Clean Up
Stop the Dapr application:
dapr stop --app-id langgraph-agent