Skip to main content

Quickstart: LangGraph Multi-Agent Communication

Build two LangGraph agents that communicate through Dapr pub/sub. One agent analyzes data and publishes insights, the other subscribes and generates recommendations. This pattern enables decoupled, scalable multi-agent architectures.


What You'll Build

  • Analyst Agent: Processes data using a LangGraph workflow and publishes insights to a topic
  • Advisor Agent: Subscribes to insights and generates actionable recommendations

Both agents run as separate services, communicating asynchronously through Dapr's pub/sub building block.


Why Async Agent Communication?

Asynchronous pub/sub messaging makes your multi-agent systems production-ready:

  • Resiliency — If an agent crashes or restarts, messages wait in the broker until it recovers. No lost work.
  • Independent scaling — Scale analyst agents and advisor agents separately based on load.
  • Loose coupling — Agents only know about topics, not each other. Add, remove, or replace agents without breaking the system.
  • Guaranteed delivery — The message broker handles retries, dead-letter queues, and at-least-once delivery.

Bring Your Own Broker

This quickstart uses Redis, but Dapr supports 40+ pub/sub brokers including Kafka, RabbitMQ, AWS SNS/SQS, Azure Service Bus, and GCP Pub/Sub. Switch brokers by changing a YAML file — your agent code stays the same.


Prerequisites

Before you begin, ensure you have:


Setup

1

Create Project Structure

Create a new project directory with the following structure:

mkdir langgraph-pubsub-agents && cd langgraph-pubsub-agents
mkdir -p components

Your project will have this structure:

langgraph-pubsub-agents/
├── components/
│ └── pubsub.yaml
├── analyst_agent.py
├── advisor_agent.py
└── requirements.txt
2

Configure Dapr Pub/Sub Component

Create components/pubsub.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

This uses Redis as the message broker. Dapr initializes Redis automatically with dapr init.

3

Install Dependencies

Create requirements.txt:

langgraph>=0.2.0
langchain-openai>=0.1.0
dapr>=1.14.0
flask>=3.0.0
cloudevents>=1.11.0

Set up a virtual environment and install:

python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
4

Set Your OpenAI API Key

export OPENAI_API_KEY="your-api-key-here"

How It Works

┌─────────────────┐     Dapr Pub/Sub      ┌─────────────────┐
│ Analyst Agent │ ──────────────────▶ │ Advisor Agent │
│ (LangGraph) │ "analyst-insights" │ (LangGraph) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
LangGraph workflow: LangGraph workflow:
gather → analyze → publish receive → recommend
  1. Analyst Agent receives a topic via HTTP, runs a LangGraph workflow to gather data, analyze it, and publish insights
  2. Dapr handles message routing through the configured pub/sub component (Redis)
  3. Advisor Agent subscribes to the topic, receives insights, and runs a LangGraph workflow to generate recommendations

Build the Agents

1

Create the Analyst Agent

Create analyst_agent.py:

import json
import operator
from typing import Annotated, TypedDict
from flask import Flask, request, jsonify
from dapr.clients import DaprClient
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI

app = Flask(__name__)

# Define the state for the graph
class AnalystState(TypedDict):
topic: str
data: str
analysis: Annotated[str, operator.add]
insights: list[str]

# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def gather_data(state: AnalystState) -> AnalystState:
"""Simulate gathering data on the topic."""
topic = state["topic"]
# In production, this would fetch real data
data = f"Market data and trends related to: {topic}"
return {"data": data}

def analyze_data(state: AnalystState) -> AnalystState:
"""Analyze the gathered data using the LLM."""
prompt = f"""Analyze the following data and provide 3 key insights:

Topic: {state['topic']}
Data: {state['data']}

Provide your analysis as a JSON array of 3 insight strings."""

response = llm.invoke(prompt)
try:
insights = json.loads(response.content)
except json.JSONDecodeError:
insights = [response.content]

return {"insights": insights, "analysis": response.content}

def should_publish(state: AnalystState) -> str:
"""Determine if we have insights to publish."""
if state.get("insights") and len(state["insights"]) > 0:
return "publish"
return "end"

def publish_insights(state: AnalystState) -> AnalystState:
"""Publish insights via Dapr pub/sub."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="analyst-insights",
data=json.dumps({
"topic": state["topic"],
"insights": state["insights"]
}),
data_content_type="application/json"
)
print(f"Published {len(state['insights'])} insights on: {state['topic']}")
return state

# Build the LangGraph workflow
workflow = StateGraph(AnalystState)
workflow.add_node("gather", gather_data)
workflow.add_node("analyze", analyze_data)
workflow.add_node("publish", publish_insights)

workflow.set_entry_point("gather")
workflow.add_edge("gather", "analyze")
workflow.add_conditional_edges("analyze", should_publish, {"publish": "publish", "end": END})
workflow.add_edge("publish", END)

graph = workflow.compile()

@app.route("/analyze", methods=["POST"])
def analyze():
"""Endpoint to trigger analysis on a topic."""
data = request.json
topic = data.get("topic", "AI market trends")

# Run the LangGraph workflow
result = graph.invoke({"topic": topic, "data": "", "analysis": "", "insights": []})

return jsonify({
"status": "published",
"topic": topic,
"insights_count": len(result.get("insights", [])),
"message": "Insights published to advisor agent"
})

@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})

if __name__ == "__main__":
app.run(port=5001)
2

Create the Advisor Agent

Create advisor_agent.py:

import json
from typing import Annotated, TypedDict
import operator
from flask import Flask, request, jsonify
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI

app = Flask(__name__)

# Store generated recommendations
recommendations = []

# Define the state for the graph
class AdvisorState(TypedDict):
topic: str
insights: list[str]
recommendations: Annotated[str, operator.add]

# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

def process_insights(state: AdvisorState) -> AdvisorState:
"""Process incoming insights and generate recommendations."""
insights_text = "\n".join(f"- {insight}" for insight in state["insights"])

prompt = f"""Based on these analyst insights about {state['topic']},
provide 2-3 actionable recommendations:

Insights:
{insights_text}

Format as a bulleted list of specific, actionable recommendations."""

response = llm.invoke(prompt)
return {"recommendations": response.content}

# Build the LangGraph workflow
workflow = StateGraph(AdvisorState)
workflow.add_node("recommend", process_insights)
workflow.set_entry_point("recommend")
workflow.add_edge("recommend", END)

graph = workflow.compile()

@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
"""Tell Dapr which topics this service subscribes to."""
subscriptions = [
{
"pubsubname": "pubsub",
"topic": "analyst-insights",
"route": "/receive-insights"
}
]
return jsonify(subscriptions)

@app.route("/receive-insights", methods=["POST"])
def receive_insights():
"""Handle incoming insights from pub/sub."""
event = request.json
data = event.get("data", {})

if isinstance(data, str):
data = json.loads(data)

topic = data.get("topic", "Unknown")
insights = data.get("insights", [])

print(f"\n{'='*50}")
print(f"Received {len(insights)} insights on: {topic}")
print(f"{'='*50}\n")

# Run the LangGraph workflow
result = graph.invoke({
"topic": topic,
"insights": insights,
"recommendations": ""
})

recommendation = {
"topic": topic,
"insights_received": len(insights),
"recommendations": result["recommendations"]
}
recommendations.append(recommendation)

print(f"\n{'='*50}")
print(f"Generated recommendations for: {topic}")
print(f"{'='*50}\n")

return jsonify({"status": "SUCCESS"})

@app.route("/recommendations", methods=["GET"])
def get_recommendations():
"""Retrieve all generated recommendations."""
return jsonify(recommendations)

@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})

if __name__ == "__main__":
app.run(port=5002)

Run the Agents

1

Start the Advisor Agent

Open a terminal and run:

dapr run --app-id advisor-agent \
--app-port 5002 \
--dapr-http-port 3501 \
--resources-path ./components \
-- python advisor_agent.py

This agent subscribes to the analyst-insights topic and waits for messages.

2

Start the Analyst Agent

Open a second terminal, activate the virtual environment, and run:

dapr run --app-id analyst-agent \
--app-port 5001 \
--dapr-http-port 3500 \
--resources-path ./components \
-- python analyst_agent.py
3

Trigger Analysis

Open a third terminal and send an analysis request:

curl -X POST http://localhost:5001/analyze \
-H "Content-Type: application/json" \
-d '{"topic": "Cloud-native AI infrastructure trends"}'

Watch the terminal outputs:

  1. Analyst Agent: Runs the LangGraph workflow (gather → analyze → publish)
  2. Advisor Agent: Receives insights via pub/sub and generates recommendations
4

View Generated Recommendations

Retrieve all recommendations from the Advisor Agent:

curl http://localhost:5002/recommendations

Key Concepts

ConceptDescription
Pub/Sub DecouplingAgents don't need to know each other's addresses. They communicate through topics.
Async CommunicationAnalyst agent publishes and returns immediately. Advisor agent processes asynchronously.
ScalabilityMultiple advisor agents can subscribe to the same topic for parallel processing.
ReliabilityDapr handles message delivery, retries, and dead-letter queues.

Next Steps

  • Add more nodes to your LangGraph workflows for complex reasoning chains
  • Use Dapr state management to persist agent memory
  • Deploy to Kubernetes with Catalyst
  • Explore Dapr workflows for orchestrated agent coordination

Clean Up

Stop both Dapr applications:

dapr stop --app-id analyst-agent
dapr stop --app-id advisor-agent