Skip to main content

Quickstart: CrewAI Multi-Agent Communication

Build two CrewAI agents that communicate through Dapr pub/sub. One agent publishes research findings, the other subscribes and summarizes them. This pattern enables decoupled, scalable multi-agent architectures.


What You'll Build

  • Research Agent: Gathers information and publishes findings to a topic
  • Summary Agent: Subscribes to findings and generates concise summaries

Both agents run as separate services, communicating asynchronously through Dapr's pub/sub building block.


Why Async Agent Communication?

Asynchronous pub/sub messaging makes your multi-agent systems production-ready:

  • Resiliency — If an agent crashes or restarts, messages wait in the broker until it recovers. No lost work.
  • Independent scaling — Scale research agents and summary agents separately based on load.
  • Loose coupling — Agents only know about topics, not each other. Add, remove, or replace agents without breaking the system.
  • Guaranteed delivery — The message broker handles retries, dead-letter queues, and at-least-once delivery.

Bring Your Own Broker

This quickstart uses Redis, but Dapr supports 40+ pub/sub brokers including Kafka, RabbitMQ, AWS SNS/SQS, Azure Service Bus, and GCP Pub/Sub. Switch brokers by changing a YAML file — your agent code stays the same.


Prerequisites

Before you begin, ensure you have:


Setup

1

Create Project Structure

Create a new project directory with the following structure:

mkdir crewai-pubsub-agents && cd crewai-pubsub-agents
mkdir -p components

Your project will have this structure:

crewai-pubsub-agents/
├── components/
│ └── pubsub.yaml
├── research_agent.py
├── summary_agent.py
└── requirements.txt
2

Configure Dapr Pub/Sub Component

Create components/pubsub.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

This uses Redis as the message broker. Dapr initializes Redis automatically with dapr init.

3

Install Dependencies

Create requirements.txt:

crewai>=0.28.0
dapr>=1.14.0
flask>=3.0.0
cloudevents>=1.11.0

Set up a virtual environment and install:

python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
4

Set Your OpenAI API Key

export OPENAI_API_KEY="your-api-key-here"

How It Works

┌─────────────────┐     Dapr Pub/Sub      ┌─────────────────┐
│ Research Agent │ ──────────────────▶ │ Summary Agent │
│ (CrewAI) │ "research-findings" │ (CrewAI) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
Researches topic Subscribes to topic
Publishes findings Receives findings
Generates summary
  1. Research Agent receives a topic via HTTP, runs a CrewAI task, and publishes findings to the research-findings topic
  2. Dapr handles message routing through the configured pub/sub component (Redis)
  3. Summary Agent subscribes to the topic, receives findings, and runs a CrewAI task to summarize

Build the Agents

1

Create the Research Agent

Create research_agent.py:

import json
import os
from flask import Flask, request, jsonify
from dapr.clients import DaprClient
from crewai import Agent, Task, Crew

app = Flask(__name__)

# CrewAI Research Agent
researcher = Agent(
role="Research Analyst",
goal="Find and analyze information on requested topics",
backstory="You are an expert researcher who gathers comprehensive information.",
verbose=True
)

def publish_findings(topic: str, findings: str):
"""Publish research findings to Dapr pub/sub."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="research-findings",
data=json.dumps({
"topic": topic,
"findings": findings
}),
data_content_type="application/json"
)
print(f"Published findings on: {topic}")

@app.route("/research", methods=["POST"])
def research():
"""Endpoint to trigger research on a topic."""
data = request.json
topic = data.get("topic", "AI agents")

# Create and execute research task
research_task = Task(
description=f"Research the following topic and provide key findings: {topic}",
expected_output="A structured summary with 3-5 key findings",
agent=researcher
)

crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True
)

result = crew.kickoff()

# Publish findings via Dapr pub/sub
publish_findings(topic, str(result))

return jsonify({
"status": "published",
"topic": topic,
"message": "Research findings published to summary agent"
})

@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})

if __name__ == "__main__":
app.run(port=5001)
2

Create the Summary Agent

Create summary_agent.py:

import json
from flask import Flask, request, jsonify
from crewai import Agent, Task, Crew

app = Flask(__name__)

# Store received summaries
summaries = []

# CrewAI Summary Agent
summarizer = Agent(
role="Content Summarizer",
goal="Create concise, actionable summaries from research findings",
backstory="You excel at distilling complex information into clear takeaways.",
verbose=True
)

@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
"""Tell Dapr which topics this service subscribes to."""
subscriptions = [
{
"pubsubname": "pubsub",
"topic": "research-findings",
"route": "/receive-findings"
}
]
return jsonify(subscriptions)

@app.route("/receive-findings", methods=["POST"])
def receive_findings():
"""Handle incoming research findings from pub/sub."""
event = request.json
data = event.get("data", {})

if isinstance(data, str):
data = json.loads(data)

topic = data.get("topic", "Unknown")
findings = data.get("findings", "")

print(f"\n{'='*50}")
print(f"Received findings on: {topic}")
print(f"{'='*50}\n")

# Create summary task
summary_task = Task(
description=f"Summarize these research findings into 2-3 bullet points:\n\n{findings}",
expected_output="A concise bullet-point summary with key takeaways",
agent=summarizer
)

crew = Crew(
agents=[summarizer],
tasks=[summary_task],
verbose=True
)

result = crew.kickoff()

summary = {
"topic": topic,
"summary": str(result)
}
summaries.append(summary)

print(f"\n{'='*50}")
print(f"Summary created for: {topic}")
print(f"{'='*50}\n")

return jsonify({"status": "SUCCESS"})

@app.route("/summaries", methods=["GET"])
def get_summaries():
"""Retrieve all generated summaries."""
return jsonify(summaries)

@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})

if __name__ == "__main__":
app.run(port=5002)

Run the Agents

1

Start the Summary Agent

Open a terminal and run:

dapr run --app-id summary-agent \
--app-port 5002 \
--dapr-http-port 3501 \
--resources-path ./components \
-- python summary_agent.py

This agent subscribes to the research-findings topic and waits for messages.

2

Start the Research Agent

Open a second terminal, activate the virtual environment, and run:

dapr run --app-id research-agent \
--app-port 5001 \
--dapr-http-port 3500 \
--resources-path ./components \
-- python research_agent.py
3

Trigger Research

Open a third terminal and send a research request:

curl -X POST http://localhost:5001/research \
-H "Content-Type: application/json" \
-d '{"topic": "Benefits of multi-agent AI systems"}'

Watch the terminal outputs:

  1. Research Agent: Executes the CrewAI task and publishes findings
  2. Summary Agent: Receives findings via pub/sub and generates a summary
4

View Generated Summaries

Retrieve all summaries from the Summary Agent:

curl http://localhost:5002/summaries

Key Concepts

ConceptDescription
Pub/Sub DecouplingAgents don't need to know each other's addresses. They communicate through topics.
Async CommunicationResearch agent publishes and returns immediately. Summary agent processes asynchronously.
ScalabilityMultiple summary agents can subscribe to the same topic for parallel processing.
ReliabilityDapr handles message delivery, retries, and dead-letter queues.

Next Steps


Clean Up

Stop both Dapr applications:

dapr stop --app-id research-agent
dapr stop --app-id summary-agent