Skip to main content

Quickstart: OpenAI Agents Multi-Agent Communication

Build two OpenAI agents that communicate through Dapr pub/sub. One agent triages support tickets and publishes classifications, the other subscribes and drafts responses. This pattern enables decoupled, scalable multi-agent architectures.


What You'll Build

  • Triage Agent: Classifies incoming support tickets and publishes them to category-specific topics
  • Response Agent: Subscribes to classified tickets and drafts appropriate responses

Both agents run as separate services, communicating asynchronously through Dapr's pub/sub building block.


Why Async Agent Communication?

Asynchronous pub/sub messaging makes your multi-agent systems production-ready:

  • Resiliency — If an agent crashes or restarts, messages wait in the broker until it recovers. No lost work.
  • Independent scaling — Scale triage agents and response agents separately based on load.
  • Loose coupling — Agents only know about topics, not each other. Add, remove, or replace agents without breaking the system.
  • Guaranteed delivery — The message broker handles retries, dead-letter queues, and at-least-once delivery.

Bring Your Own Broker

This quickstart uses Redis, but Dapr supports 40+ pub/sub brokers including Kafka, RabbitMQ, AWS SNS/SQS, Azure Service Bus, and GCP Pub/Sub. Switch brokers by changing a YAML file — your agent code stays the same.


Prerequisites

Before you begin, ensure you have:


Setup

1

Create Project Structure

Create a new project directory with the following structure:

mkdir openai-pubsub-agents && cd openai-pubsub-agents
mkdir -p components

Your project will have this structure:

openai-pubsub-agents/
├── components/
│ └── pubsub.yaml
├── triage_agent.py
├── response_agent.py
└── requirements.txt
2

Configure Dapr Pub/Sub Component

Create components/pubsub.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

This uses Redis as the message broker. Dapr initializes Redis automatically with dapr init.

3

Install Dependencies

Create requirements.txt:

openai-agents>=0.0.3
dapr>=1.14.0
flask>=3.0.0
cloudevents>=1.11.0

Set up a virtual environment and install:

python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
4

Set Your OpenAI API Key

export OPENAI_API_KEY="your-api-key-here"

How It Works

┌─────────────────┐      Dapr Pub/Sub       ┌─────────────────┐
│ Triage Agent │ ───────────────────▶ │ Response Agent │
│ (OpenAI) │ "classified-tickets" │ (OpenAI) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
Classifies ticket Subscribes to topic
Publishes classification Receives ticket
Drafts response
  1. Triage Agent receives a support ticket via HTTP, uses OpenAI to classify it, and publishes the result
  2. Dapr handles message routing through the configured pub/sub component (Redis)
  3. Response Agent subscribes to the topic, receives classified tickets, and uses OpenAI to draft responses

Build the Agents

1

Create the Triage Agent

Create triage_agent.py:

import json
import asyncio
from flask import Flask, request, jsonify
from dapr.clients import DaprClient
from agents import Agent, Runner

app = Flask(__name__)

# Create the OpenAI Triage Agent
triage_agent = Agent(
name="TicketTriage",
instructions="""You are a support ticket triage specialist.
Analyze incoming tickets and classify them into categories.

Categories:
- billing: Payment, invoices, refunds, pricing
- technical: Bugs, errors, integration issues
- account: Login, password, profile settings
- general: Everything else

Respond with JSON: {"category": "...", "priority": "high|medium|low", "summary": "..."}""",
model="gpt-4o-mini"
)

def publish_ticket(ticket_id: str, original: str, classification: dict):
"""Publish classified ticket to Dapr pub/sub."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="classified-tickets",
data=json.dumps({
"ticket_id": ticket_id,
"original_message": original,
"classification": classification
}),
data_content_type="application/json"
)
print(f"Published ticket {ticket_id} - Category: {classification.get('category')}")

async def classify_ticket(ticket_text: str) -> dict:
"""Run the OpenAI agent to classify the ticket."""
result = await Runner.run(triage_agent, ticket_text)
try:
return json.loads(result.final_output)
except json.JSONDecodeError:
return {"category": "general", "priority": "medium", "summary": result.final_output}

@app.route("/triage", methods=["POST"])
def triage():
"""Endpoint to submit a support ticket for triage."""
data = request.json
ticket_id = data.get("ticket_id", "T-001")
message = data.get("message", "I need help")

# Classify the ticket using OpenAI agent
classification = asyncio.run(classify_ticket(message))

# Publish to Dapr pub/sub
publish_ticket(ticket_id, message, classification)

return jsonify({
"status": "published",
"ticket_id": ticket_id,
"classification": classification,
"message": "Ticket classified and sent to response agent"
})

@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})

if __name__ == "__main__":
app.run(port=5001)
2

Create the Response Agent

Create response_agent.py:

import json
import asyncio
from flask import Flask, request, jsonify
from agents import Agent, Runner

app = Flask(__name__)

# Store drafted responses
responses = []

# Create the OpenAI Response Agent
response_agent = Agent(
name="ResponseDrafter",
instructions="""You are a customer support response specialist.
Given a classified support ticket, draft a helpful, professional response.

Guidelines:
- Be empathetic and professional
- Address the specific issue mentioned
- Provide clear next steps
- Keep responses concise but complete

Format your response as a ready-to-send email reply.""",
model="gpt-4o-mini"
)

async def draft_response(ticket: dict) -> str:
"""Run the OpenAI agent to draft a response."""
prompt = f"""Draft a response for this support ticket:

Category: {ticket['classification']['category']}
Priority: {ticket['classification']['priority']}
Summary: {ticket['classification']['summary']}

Original message:
{ticket['original_message']}"""

result = await Runner.run(response_agent, prompt)
return result.final_output

@app.route("/dapr/subscribe", methods=["GET"])
def subscribe():
"""Tell Dapr which topics this service subscribes to."""
subscriptions = [
{
"pubsubname": "pubsub",
"topic": "classified-tickets",
"route": "/receive-ticket"
}
]
return jsonify(subscriptions)

@app.route("/receive-ticket", methods=["POST"])
def receive_ticket():
"""Handle incoming classified tickets from pub/sub."""
event = request.json
data = event.get("data", {})

if isinstance(data, str):
data = json.loads(data)

ticket_id = data.get("ticket_id", "Unknown")

print(f"\n{'='*50}")
print(f"Received ticket: {ticket_id}")
print(f"Category: {data['classification']['category']}")
print(f"Priority: {data['classification']['priority']}")
print(f"{'='*50}\n")

# Draft response using OpenAI agent
draft = asyncio.run(draft_response(data))

response_record = {
"ticket_id": ticket_id,
"category": data["classification"]["category"],
"priority": data["classification"]["priority"],
"original_message": data["original_message"],
"drafted_response": draft
}
responses.append(response_record)

print(f"\n{'='*50}")
print(f"Response drafted for ticket: {ticket_id}")
print(f"{'='*50}\n")

return jsonify({"status": "SUCCESS"})

@app.route("/responses", methods=["GET"])
def get_responses():
"""Retrieve all drafted responses."""
return jsonify(responses)

@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})

if __name__ == "__main__":
app.run(port=5002)

Run the Agents

1

Start the Response Agent

Open a terminal and run:

dapr run --app-id response-agent \
--app-port 5002 \
--dapr-http-port 3501 \
--resources-path ./components \
-- python response_agent.py

This agent subscribes to the classified-tickets topic and waits for messages.

2

Start the Triage Agent

Open a second terminal, activate the virtual environment, and run:

dapr run --app-id triage-agent \
--app-port 5001 \
--dapr-http-port 3500 \
--resources-path ./components \
-- python triage_agent.py
3

Submit a Support Ticket

Open a third terminal and submit a ticket:

curl -X POST http://localhost:5001/triage \
-H "Content-Type: application/json" \
-d '{
"ticket_id": "T-12345",
"message": "I was charged twice for my subscription last month. Order #98765. Please refund the duplicate charge."
}'

Watch the terminal outputs:

  1. Triage Agent: Classifies the ticket (should be "billing", "high" priority) and publishes
  2. Response Agent: Receives the classification via pub/sub and drafts a response
4

View Drafted Responses

Retrieve all drafted responses from the Response Agent:

curl http://localhost:5002/responses

Key Concepts

ConceptDescription
Pub/Sub DecouplingAgents don't need to know each other's addresses. They communicate through topics.
Async CommunicationTriage agent publishes and returns immediately. Response agent processes asynchronously.
ScalabilityMultiple response agents can subscribe to the same topic for parallel processing.
ReliabilityDapr handles message delivery, retries, and dead-letter queues.

Next Steps

  • Add specialized response agents for each category (billing, technical, account)
  • Use Dapr state management to track ticket history
  • Deploy to Kubernetes with Catalyst
  • Explore Dapr workflows for orchestrated agent coordination

Clean Up

Stop both Dapr applications:

dapr stop --app-id triage-agent
dapr stop --app-id response-agent