Quickstart: OpenAI Agents Multi-Agent Communication
Build two OpenAI agents that communicate through Dapr pub/sub. One agent triages support tickets and publishes classifications, the other subscribes and drafts responses. This pattern enables decoupled, scalable multi-agent architectures.
What You'll Build
- Triage Agent: Classifies incoming support tickets and publishes them to category-specific topics
- Response Agent: Subscribes to classified tickets and drafts appropriate responses
Both agents run as separate services, communicating asynchronously through Dapr's pub/sub building block.
Why Async Agent Communication?
Asynchronous pub/sub messaging makes your multi-agent systems production-ready:
- Resiliency — If an agent crashes or restarts, messages wait in the broker until it recovers. No lost work.
- Independent scaling — Scale triage agents and response agents separately based on load.
- Loose coupling — Agents only know about topics, not each other. Add, remove, or replace agents without breaking the system.
- Guaranteed delivery — The message broker handles retries, dead-letter queues, and at-least-once delivery.
Bring Your Own Broker
This quickstart uses Redis, but Dapr supports 40+ pub/sub brokers including Kafka, RabbitMQ, AWS SNS/SQS, Azure Service Bus, and GCP Pub/Sub. Switch brokers by changing a YAML file — your agent code stays the same.
Prerequisites
Before you begin, ensure you have:
- Dapr CLI installed and initialized (
dapr init) - Python 3.11+
- An OpenAI API key
Setup
Create Project Structure
Create a new project directory with the following structure:
mkdir openai-pubsub-agents && cd openai-pubsub-agents
mkdir -p components
Your project will have this structure:
openai-pubsub-agents/
├── components/
│ └── pubsub.yaml
├── triage_agent.py
├── response_agent.py
└── requirements.txt
Configure Dapr Pub/Sub Component
Create components/pubsub.yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
This uses Redis as the message broker. Dapr initializes Redis automatically with dapr init.
Install Dependencies
Create requirements.txt:
openai-agents>=0.0.3
dapr>=1.14.0
flask>=3.0.0
cloudevents>=1.11.0
Set up a virtual environment and install:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
Set Your OpenAI API Key
export OPENAI_API_KEY="your-api-key-here"
How It Works
┌─────────────────┐ Dapr Pub/Sub ┌─────────────────┐
│ Triage Agent │ ───────────────────▶ │ Response Agent │
│ (OpenAI) │ "classified-tickets" │ (OpenAI) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
Classifies ticket Subscribes to topic
Publishes classification Receives ticket
Drafts response
- Triage Agent receives a support ticket via HTTP, uses OpenAI to classify it, and publishes the result
- Dapr handles message routing through the configured pub/sub component (Redis)
- Response Agent subscribes to the topic, receives classified tickets, and uses OpenAI to draft responses
Build the Agents
Create the Triage Agent
Create triage_agent.py:
import json
import asyncio
from flask import Flask, request, jsonify
from dapr.clients import DaprClient
from agents import Agent, Runner
app = Flask(__name__)
# Create the OpenAI Triage Agent
triage_agent = Agent(
name="TicketTriage",
instructions="""You are a support ticket triage specialist.
Analyze incoming tickets and classify them into categories.
Categories:
- billing: Payment, invoices, refunds, pricing
- technical: Bugs, errors, integration issues
- account: Login, password, profile settings
- general: Everything else
Respond with JSON: {"category": "...", "priority": "high|medium|low", "summary": "..."}""",
model="gpt-4o-mini"
)
def publish_ticket(ticket_id: str, original: str, classification: dict):
"""Publish classified ticket to Dapr pub/sub."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="classified-tickets",
data=json.dumps({
"ticket_id": ticket_id,
"original_message": original,
"classification": classification
}),
data_content_type="application/json"
)
print(f"Published ticket {ticket_id} - Category: {classification.get('category')}")
async def classify_ticket(ticket_text: str) -> dict:
"""Run the OpenAI agent to classify the ticket."""
result = await Runner.run(triage_agent, ticket_text)
try:
return json.loads(result.final_output)
except json.JSONDecodeError:
return {"category": "general", "priority": "medium", "summary": result.final_output}
@app.route("/triage", methods=["POST"])
def triage():
"""Endpoint to submit a support ticket for triage."""
data = request.json
ticket_id = data.get("ticket_id", "T-001")
message = data.get("message", "I need help")
# Classify the ticket using OpenAI agent
classification = asyncio.run(classify_ticket(message))
# Publish to Dapr pub/sub
publish_ticket(ticket_id, message, classification)
return jsonify({
"status": "published",
"ticket_id": ticket_id,
"classification": classification,
"message": "Ticket classified and sent to response agent"
})
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(port=5001)
Create the Response Agent
Create response_agent.py:
import json
import asyncio
from flask import Flask, request, jsonify
from agents import Agent, Runner
app = Flask(__name__)
# Store drafted responses
responses = []
# Create the OpenAI Response Agent
response_agent = Agent(
name="ResponseDrafter",
instructions="""You are a customer support response specialist.
Given a classified support ticket, draft a helpful, professional response.
Guidelines:
- Be empathetic and professional
- Address the specific issue mentioned
- Provide clear next steps
- Keep responses concise but complete
Format your response as a ready-to-send email reply.""",
model="gpt-4o-mini"
)
async def draft_response(ticket: dict) -> str:
"""Run the OpenAI agent to draft a response."""
prompt = f"""Draft a response for this support ticket:
Category: {ticket['classification']['category']}
Priority: {ticket['classification']['priority']}
Summary: {ticket['classification']['summary']}
Original message:
{ticket['original_message']}"""
result = await Runner.run(response_agent, prompt)
return result.final_output
@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
"""Tell Dapr which topics this service subscribes to."""
subscriptions = [
{
"pubsubname": "pubsub",
"topic": "classified-tickets",
"route": "/receive-ticket"
}
]
return jsonify(subscriptions)
@app.route("/receive-ticket", methods=["POST"])
def receive_ticket():
"""Handle incoming classified tickets from pub/sub."""
event = request.json
data = event.get("data", {})
if isinstance(data, str):
data = json.loads(data)
ticket_id = data.get("ticket_id", "Unknown")
print(f"\n{'='*50}")
print(f"Received ticket: {ticket_id}")
print(f"Category: {data['classification']['category']}")
print(f"Priority: {data['classification']['priority']}")
print(f"{'='*50}\n")
# Draft response using OpenAI agent
draft = asyncio.run(draft_response(data))
response_record = {
"ticket_id": ticket_id,
"category": data["classification"]["category"],
"priority": data["classification"]["priority"],
"original_message": data["original_message"],
"drafted_response": draft
}
responses.append(response_record)
print(f"\n{'='*50}")
print(f"Response drafted for ticket: {ticket_id}")
print(f"{'='*50}\n")
return jsonify({"status": "SUCCESS"})
@app.route("/responses", methods=["GET"])
def get_responses():
"""Retrieve all drafted responses."""
return jsonify(responses)
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
if __name__ == "__main__":
app.run(port=5002)
Run the Agents
Start the Response Agent
Open a terminal and run:
dapr run --app-id response-agent \
--app-port 5002 \
--dapr-http-port 3501 \
--resources-path ./components \
-- python response_agent.py
This agent subscribes to the classified-tickets topic and waits for messages.
Start the Triage Agent
Open a second terminal, activate the virtual environment, and run:
dapr run --app-id triage-agent \
--app-port 5001 \
--dapr-http-port 3500 \
--resources-path ./components \
-- python triage_agent.py
Submit a Support Ticket
Open a third terminal and submit a ticket:
curl -X POST http://localhost:5001/triage \
-H "Content-Type: application/json" \
-d '{
"ticket_id": "T-12345",
"message": "I was charged twice for my subscription last month. Order #98765. Please refund the duplicate charge."
}'
Watch the terminal outputs:
- Triage Agent: Classifies the ticket (should be "billing", "high" priority) and publishes
- Response Agent: Receives the classification via pub/sub and drafts a response
View Drafted Responses
Retrieve all drafted responses from the Response Agent:
curl http://localhost:5002/responses
Key Concepts
| Concept | Description |
|---|---|
| Pub/Sub Decoupling | Agents don't need to know each other's addresses. They communicate through topics. |
| Async Communication | Triage agent publishes and returns immediately. Response agent processes asynchronously. |
| Scalability | Multiple response agents can subscribe to the same topic for parallel processing. |
| Reliability | Dapr handles message delivery, retries, and dead-letter queues. |
Next Steps
- Add specialized response agents for each category (billing, technical, account)
- Use Dapr state management to track ticket history
- Deploy to Kubernetes with Catalyst
- Explore Dapr workflows for orchestrated agent coordination
Clean Up
Stop both Dapr applications:
dapr stop --app-id triage-agent
dapr stop --app-id response-agent