Quickstart: CrewAI Multi-Agent Communication
Build two CrewAI agents that communicate through Dapr pub/sub. One agent publishes research findings, the other subscribes and summarizes them. This pattern enables decoupled, scalable multi-agent architectures.
What You'll Build
- Research Agent: Gathers information and publishes findings to a topic
- Summary Agent: Subscribes to findings and generates concise summaries
Both agents run as separate services, communicating asynchronously through Dapr's pub/sub building block.
Why Async Agent Communication?
Asynchronous pub/sub messaging makes your multi-agent systems production-ready:
- Resiliency — If an agent crashes or restarts, messages wait in the broker until it recovers. No lost work.
- Independent scaling — Scale research agents and summary agents separately based on load.
- Loose coupling — Agents only know about topics, not each other. Add, remove, or replace agents without breaking the system.
- Guaranteed delivery — The message broker handles retries, dead-letter queues, and at-least-once delivery.
Bring Your Own Broker
This quickstart uses Redis, but Dapr supports 40+ pub/sub brokers including Kafka, RabbitMQ, AWS SNS/SQS, Azure Service Bus, and GCP Pub/Sub. Switch brokers by changing a YAML file — your agent code stays the same.
Prerequisites
Before you begin, ensure you have:
- Dapr CLI installed and initialized (
dapr init) - Python 3.11+
- An OpenAI API key
Setup
Create Project Structure
Create a new project directory with the following structure:
mkdir crewai-pubsub-agents && cd crewai-pubsub-agents
mkdir -p components
Your project will have this structure:
crewai-pubsub-agents/
├── components/
│ └── pubsub.yaml
├── research_agent.py
├── summary_agent.py
└── requirements.txt
Configure Dapr Pub/Sub Component
Create components/pubsub.yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
This uses Redis as the message broker. Dapr initializes Redis automatically with dapr init.
Install Dependencies
Create requirements.txt:
crewai>=0.28.0
dapr>=1.14.0
flask>=3.0.0
cloudevents>=1.11.0
Set up a virtual environment and install:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
Set Your OpenAI API Key
export OPENAI_API_KEY="your-api-key-here"
How It Works
┌─────────────────┐ Dapr Pub/Sub ┌─────────────────┐
│ Research Agent │ ──────────────────▶ │ Summary Agent │
│ (CrewAI) │ "research-findings" │ (CrewAI) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
Researches topic Subscribes to topic
Publishes findings Receives findings
Generates summary
- Research Agent receives a topic via HTTP, runs a CrewAI task, and publishes findings to the
research-findingstopic - Dapr handles message routing through the configured pub/sub component (Redis)
- Summary Agent subscribes to the topic, receives findings, and runs a CrewAI task to summarize
Build the Agents
Create the Research Agent
Create research_agent.py:
import json
import os
from flask import Flask, request, jsonify
from dapr.clients import DaprClient
from crewai import Agent, Task, Crew
app = Flask(__name__)
# CrewAI Research Agent
researcher = Agent(
role="Research Analyst",
goal="Find and analyze information on requested topics",
backstory="You are an expert researcher who gathers comprehensive information.",
verbose=True
)
def publish_findings(topic: str, findings: str):
"""Publish research findings to Dapr pub/sub."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="research-findings",
data=json.dumps({
"topic": topic,
"findings": findings
}),
data_content_type="application/json"
)
print(f"Published findings on: {topic}")
@app.route("/research", methods=["POST"])
def research():
"""Endpoint to trigger research on a topic."""
data = request.json
topic = data.get("topic", "AI agents")
# Create and execute research task
research_task = Task(
description=f"Research the following topic and provide key findings: {topic}",
expected_output="A structured summary with 3-5 key findings",
agent=researcher
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True
)
result = crew.kickoff()
# Publish findings via Dapr pub/sub
publish_findings(topic, str(result))
return jsonify({
"status": "published",
"topic": topic,
"message": "Research findings published to summary agent"
})
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(port=5001)
Create the Summary Agent
Create summary_agent.py:
import json
from flask import Flask, request, jsonify
from crewai import Agent, Task, Crew
app = Flask(__name__)
# Store received summaries
summaries = []
# CrewAI Summary Agent
summarizer = Agent(
role="Content Summarizer",
goal="Create concise, actionable summaries from research findings",
backstory="You excel at distilling complex information into clear takeaways.",
verbose=True
)
@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
"""Tell Dapr which topics this service subscribes to."""
subscriptions = [
{
"pubsubname": "pubsub",
"topic": "research-findings",
"route": "/receive-findings"
}
]
return jsonify(subscriptions)
@app.route("/receive-findings", methods=["POST"])
def receive_findings():
"""Handle incoming research findings from pub/sub."""
event = request.json
data = event.get("data", {})
if isinstance(data, str):
data = json.loads(data)
topic = data.get("topic", "Unknown")
findings = data.get("findings", "")
print(f"\n{'='*50}")
print(f"Received findings on: {topic}")
print(f"{'='*50}\n")
# Create summary task
summary_task = Task(
description=f"Summarize these research findings into 2-3 bullet points:\n\n{findings}",
expected_output="A concise bullet-point summary with key takeaways",
agent=summarizer
)
crew = Crew(
agents=[summarizer],
tasks=[summary_task],
verbose=True
)
result = crew.kickoff()
summary = {
"topic": topic,
"summary": str(result)
}
summaries.append(summary)
print(f"\n{'='*50}")
print(f"Summary created for: {topic}")
print(f"{'='*50}\n")
return jsonify({"status": "SUCCESS"})
@app.route("/summaries", methods=["GET"])
def get_summaries():
"""Retrieve all generated summaries."""
return jsonify(summaries)
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(port=5002)
Run the Agents
Start the Summary Agent
Open a terminal and run:
dapr run --app-id summary-agent \
--app-port 5002 \
--dapr-http-port 3501 \
--resources-path ./components \
-- python summary_agent.py
This agent subscribes to the research-findings topic and waits for messages.
Start the Research Agent
Open a second terminal, activate the virtual environment, and run:
dapr run --app-id research-agent \
--app-port 5001 \
--dapr-http-port 3500 \
--resources-path ./components \
-- python research_agent.py
Trigger Research
Open a third terminal and send a research request:
curl -X POST http://localhost:5001/research \
-H "Content-Type: application/json" \
-d '{"topic": "Benefits of multi-agent AI systems"}'
Watch the terminal outputs:
- Research Agent: Executes the CrewAI task and publishes findings
- Summary Agent: Receives findings via pub/sub and generates a summary
View Generated Summaries
Retrieve all summaries from the Summary Agent:
curl http://localhost:5002/summaries
Key Concepts
| Concept | Description |
|---|---|
| Pub/Sub Decoupling | Agents don't need to know each other's addresses. They communicate through topics. |
| Async Communication | Research agent publishes and returns immediately. Summary agent processes asynchronously. |
| Scalability | Multiple summary agents can subscribe to the same topic for parallel processing. |
| Reliability | Dapr handles message delivery, retries, and dead-letter queues. |
Next Steps
- Add more agents to create a processing pipeline
- Use Dapr state management to persist agent memory
- Deploy to Kubernetes with Catalyst
- Explore Dapr workflows for orchestrated agent coordination
Clean Up
Stop both Dapr applications:
dapr stop --app-id research-agent
dapr stop --app-id summary-agent