Quickstart: LangGraph Multi-Agent Communication
Build two LangGraph agents that communicate through Dapr pub/sub. One agent analyzes data and publishes insights, the other subscribes and generates recommendations. This pattern enables decoupled, scalable multi-agent architectures.
What You'll Build
- Analyst Agent: Processes data using a LangGraph workflow and publishes insights to a topic
- Advisor Agent: Subscribes to insights and generates actionable recommendations
Both agents run as separate services, communicating asynchronously through Dapr's pub/sub building block.
Why Async Agent Communication?
Asynchronous pub/sub messaging makes your multi-agent systems production-ready:
- Resiliency — If an agent crashes or restarts, messages wait in the broker until it recovers. No lost work.
- Independent scaling — Scale analyst agents and advisor agents separately based on load.
- Loose coupling — Agents only know about topics, not each other. Add, remove, or replace agents without breaking the system.
- Guaranteed delivery — The message broker handles retries, dead-letter queues, and at-least-once delivery.
Bring Your Own Broker
This quickstart uses Redis, but Dapr supports 40+ pub/sub brokers including Kafka, RabbitMQ, AWS SNS/SQS, Azure Service Bus, and GCP Pub/Sub. Switch brokers by changing a YAML file — your agent code stays the same.
Prerequisites
Before you begin, ensure you have:
- Dapr CLI installed and initialized (
dapr init) - Python 3.11+
- An OpenAI API key
Setup
Create Project Structure
Create a new project directory with the following structure:
mkdir langgraph-pubsub-agents && cd langgraph-pubsub-agents
mkdir -p components
Your project will have this structure:
langgraph-pubsub-agents/
├── components/
│ └── pubsub.yaml
├── analyst_agent.py
├── advisor_agent.py
└── requirements.txt
Configure Dapr Pub/Sub Component
Create components/pubsub.yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
This uses Redis as the message broker. Dapr initializes Redis automatically with dapr init.
Install Dependencies
Create requirements.txt:
langgraph>=0.2.0
langchain-openai>=0.1.0
dapr>=1.14.0
flask>=3.0.0
cloudevents>=1.11.0
Set up a virtual environment and install:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
Set Your OpenAI API Key
export OPENAI_API_KEY="your-api-key-here"
How It Works
┌─────────────────┐ Dapr Pub/Sub ┌─────────────────┐
│ Analyst Agent │ ──────────────────▶ │ Advisor Agent │
│ (LangGraph) │ "analyst-insights" │ (LangGraph) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
LangGraph workflow: LangGraph workflow:
gather → analyze → publish receive → recommend
- Analyst Agent receives a topic via HTTP, runs a LangGraph workflow to gather data, analyze it, and publish insights
- Dapr handles message routing through the configured pub/sub component (Redis)
- Advisor Agent subscribes to the topic, receives insights, and runs a LangGraph workflow to generate recommendations
Build the Agents
Create the Analyst Agent
Create analyst_agent.py:
import json
import operator
from typing import Annotated, TypedDict
from flask import Flask, request, jsonify
from dapr.clients import DaprClient
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
app = Flask(__name__)
# Define the state for the graph
class AnalystState(TypedDict):
topic: str
data: str
analysis: Annotated[str, operator.add]
insights: list[str]
# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def gather_data(state: AnalystState) -> AnalystState:
"""Simulate gathering data on the topic."""
topic = state["topic"]
# In production, this would fetch real data
data = f"Market data and trends related to: {topic}"
return {"data": data}
def analyze_data(state: AnalystState) -> AnalystState:
"""Analyze the gathered data using the LLM."""
prompt = f"""Analyze the following data and provide 3 key insights:
Topic: {state['topic']}
Data: {state['data']}
Provide your analysis as a JSON array of 3 insight strings."""
response = llm.invoke(prompt)
try:
insights = json.loads(response.content)
except json.JSONDecodeError:
insights = [response.content]
return {"insights": insights, "analysis": response.content}
def should_publish(state: AnalystState) -> str:
"""Determine if we have insights to publish."""
if state.get("insights") and len(state["insights"]) > 0:
return "publish"
return "end"
def publish_insights(state: AnalystState) -> AnalystState:
"""Publish insights via Dapr pub/sub."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="analyst-insights",
data=json.dumps({
"topic": state["topic"],
"insights": state["insights"]
}),
data_content_type="application/json"
)
print(f"Published {len(state['insights'])} insights on: {state['topic']}")
return state
# Build the LangGraph workflow
workflow = StateGraph(AnalystState)
workflow.add_node("gather", gather_data)
workflow.add_node("analyze", analyze_data)
workflow.add_node("publish", publish_insights)
workflow.set_entry_point("gather")
workflow.add_edge("gather", "analyze")
workflow.add_conditional_edges("analyze", should_publish, {"publish": "publish", "end": END})
workflow.add_edge("publish", END)
graph = workflow.compile()
@app.route("/analyze", methods=["POST"])
def analyze():
"""Endpoint to trigger analysis on a topic."""
data = request.json
topic = data.get("topic", "AI market trends")
# Run the LangGraph workflow
result = graph.invoke({"topic": topic, "data": "", "analysis": "", "insights": []})
return jsonify({
"status": "published",
"topic": topic,
"insights_count": len(result.get("insights", [])),
"message": "Insights published to advisor agent"
})
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(port=5001)
Create the Advisor Agent
Create advisor_agent.py:
import json
from typing import Annotated, TypedDict
import operator
from flask import Flask, request, jsonify
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
app = Flask(__name__)
# Store generated recommendations
recommendations = []
# Define the state for the graph
class AdvisorState(TypedDict):
topic: str
insights: list[str]
recommendations: Annotated[str, operator.add]
# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
def process_insights(state: AdvisorState) -> AdvisorState:
"""Process incoming insights and generate recommendations."""
insights_text = "\n".join(f"- {insight}" for insight in state["insights"])
prompt = f"""Based on these analyst insights about {state['topic']},
provide 2-3 actionable recommendations:
Insights:
{insights_text}
Format as a bulleted list of specific, actionable recommendations."""
response = llm.invoke(prompt)
return {"recommendations": response.content}
# Build the LangGraph workflow
workflow = StateGraph(AdvisorState)
workflow.add_node("recommend", process_insights)
workflow.set_entry_point("recommend")
workflow.add_edge("recommend", END)
graph = workflow.compile()
@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
"""Tell Dapr which topics this service subscribes to."""
subscriptions = [
{
"pubsubname": "pubsub",
"topic": "analyst-insights",
"route": "/receive-insights"
}
]
return jsonify(subscriptions)
@app.route("/receive-insights", methods=["POST"])
def receive_insights():
"""Handle incoming insights from pub/sub."""
event = request.json
data = event.get("data", {})
if isinstance(data, str):
data = json.loads(data)
topic = data.get("topic", "Unknown")
insights = data.get("insights", [])
print(f"\n{'='*50}")
print(f"Received {len(insights)} insights on: {topic}")
print(f"{'='*50}\n")
# Run the LangGraph workflow
result = graph.invoke({
"topic": topic,
"insights": insights,
"recommendations": ""
})
recommendation = {
"topic": topic,
"insights_received": len(insights),
"recommendations": result["recommendations"]
}
recommendations.append(recommendation)
print(f"\n{'='*50}")
print(f"Generated recommendations for: {topic}")
print(f"{'='*50}\n")
return jsonify({"status": "SUCCESS"})
@app.route("/recommendations", methods=["GET"])
def get_recommendations():
"""Retrieve all generated recommendations."""
return jsonify(recommendations)
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(port=5002)
Run the Agents
Start the Advisor Agent
Open a terminal and run:
dapr run --app-id advisor-agent \
--app-port 5002 \
--dapr-http-port 3501 \
--resources-path ./components \
-- python advisor_agent.py
This agent subscribes to the analyst-insights topic and waits for messages.
Start the Analyst Agent
Open a second terminal, activate the virtual environment, and run:
dapr run --app-id analyst-agent \
--app-port 5001 \
--dapr-http-port 3500 \
--resources-path ./components \
-- python analyst_agent.py
Trigger Analysis
Open a third terminal and send an analysis request:
curl -X POST http://localhost:5001/analyze \
-H "Content-Type: application/json" \
-d '{"topic": "Cloud-native AI infrastructure trends"}'
Watch the terminal outputs:
- Analyst Agent: Runs the LangGraph workflow (gather → analyze → publish)
- Advisor Agent: Receives insights via pub/sub and generates recommendations
View Generated Recommendations
Retrieve all recommendations from the Advisor Agent:
curl http://localhost:5002/recommendations
Key Concepts
| Concept | Description |
|---|---|
| Pub/Sub Decoupling | Agents don't need to know each other's addresses. They communicate through topics. |
| Async Communication | Analyst agent publishes and returns immediately. Advisor agent processes asynchronously. |
| Scalability | Multiple advisor agents can subscribe to the same topic for parallel processing. |
| Reliability | Dapr handles message delivery, retries, and dead-letter queues. |
Next Steps
- Add more nodes to your LangGraph workflows for complex reasoning chains
- Use Dapr state management to persist agent memory
- Deploy to Kubernetes with Catalyst
- Explore Dapr workflows for orchestrated agent coordination
Clean Up
Stop both Dapr applications:
dapr stop --app-id analyst-agent
dapr stop --app-id advisor-agent