Agent Orchestration & Workflows
Orchestrate multiple agents to build sophisticated automation workflows.
What is Agent Orchestration?
Orchestration is coordinating multiple agents to work together on complex tasks:
User Input
↓
[Orchestrator Agent]
├─→ [Document Analysis Agent] → Extract key info
├─→ [Meeting Summary Agent] → Summarize meetings
└─→ [Task Creation Agent] → Create follow-ups
↓
Coordinated ResultsWorkflow Architecture
Sequential Workflow
Execute agents one after another:
Step 1: Analyze Document
↓
Step 2: Extract Action Items
↓
Step 3: Create Tasks
↓
CompleteExample:
python
# Step 1: Analyze document
analysis = client.documents.analyze(
document_url='https://example.com/contract.pdf',
analysis_type='contract_review'
)
# Step 2: Extract obligations
obligations = client.documents.chat(
document_id=analysis.id,
question='What are the main obligations?'
)
# Step 3: Create tasks from obligations
for obligation in obligations:
client.tasks.create(
title=f'Fulfill: {obligation}',
description=obligation,
priority='high'
)Parallel Workflow
Execute multiple agents simultaneously:
Analysis Agent (Process document)
↓
[Orchestrator]
↓
Summary Agent (Generate summary)
↓ ↓
Agent 1 Agent 2 Agent 3Example:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def parallel_analysis(document_url):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
# Run all analyses in parallel
results = await asyncio.gather(
loop.run_in_executor(executor, lambda: client.documents.analyze(
document_url=document_url,
analysis_type='contract_review'
)),
loop.run_in_executor(executor, lambda: client.documents.analyze(
document_url=document_url,
analysis_type='financial_analysis'
)),
loop.run_in_executor(executor, lambda: client.documents.analyze(
document_url=document_url,
analysis_type='legal_review'
))
)
return {
'contract_review': results[0],
'financial_analysis': results[1],
'legal_review': results[2]
}
# Usage
analysis = await parallel_analysis('https://example.com/contract.pdf')Conditional Workflow
Branch based on conditions:
Analyze Document
↓
[Is it a contract?]
/ \
YES NO
↓ ↓
Extract Extract
Legal Financial
Terms InfoExample:
python
# Analyze document
analysis = client.documents.analyze(
document_url=document_url,
analysis_type='document_review'
)
# Branch based on document type
if analysis.document_type == 'contract':
# Handle as contract
obligations = client.documents.chat(
document_id=analysis.id,
question='What are the legal obligations?'
)
elif analysis.document_type == 'financial':
# Handle as financial document
summary = client.documents.chat(
document_id=analysis.id,
question='What are the key financial metrics?'
)Building Orchestrators
Basic Orchestrator
python
class WorkflowOrchestrator:
def __init__(self, client):
self.client = client
def process_contract(self, document_url):
"""Process contract through multiple agents"""
# Agent 1: Analyze
print("Analyzing contract...")
analysis = self.client.documents.analyze(
document_url=document_url,
analysis_type='contract_review'
)
# Agent 2: Extract obligations
print("Extracting obligations...")
obligations = self.client.documents.chat(
document_id=analysis.id,
question='What are the key obligations?'
)
# Agent 3: Create tasks
print("Creating tasks...")
tasks = []
for obligation in obligations['items']:
task = self.client.tasks.create(
title=obligation,
priority='high',
related_document_id=analysis.id
)
tasks.append(task)
return {
'analysis': analysis,
'obligations': obligations,
'tasks': tasks
}
# Usage
orchestrator = WorkflowOrchestrator(client)
result = orchestrator.process_contract('https://example.com/contract.pdf')Advanced Orchestrator with State Management
python
class AdvancedOrchestrator:
def __init__(self, client):
self.client = client
self.state = {}
def process_meeting_to_jira(self, audio_url):
"""Process meeting and create Jira tickets"""
try:
# Step 1: Transcribe meeting
print("Step 1: Transcribing meeting...")
transcript = self._transcribe(audio_url)
self.state['transcript_id'] = transcript.id
# Step 2: Extract action items
print("Step 2: Extracting action items...")
actions = self._extract_actions(transcript.id)
self.state['actions'] = actions
# Step 3: Validate actions
print("Step 3: Validating action items...")
validated = self._validate(actions)
self.state['validated_actions'] = validated
# Step 4: Create Jira issues
print("Step 4: Creating Jira tickets...")
issues = self._create_jira_issues(validated)
self.state['jira_issues'] = issues
# Step 5: Send summary
print("Step 5: Sending summary...")
self._send_summary(issues)
return {
'status': 'success',
'transcript_id': transcript.id,
'action_count': len(validated),
'jira_issues': issues
}
except Exception as e:
print(f"Error: {e}")
self._on_error(e)
return {
'status': 'failed',
'error': str(e),
'state': self.state
}
def _transcribe(self, audio_url):
return self.client.transcribe(
audio_url=audio_url,
language='en'
)
def _extract_actions(self, transcript_id):
return self.client.meetings.action_items(
transcript_id=transcript_id,
assign_to_speakers=True
)
def _validate(self, actions):
# Filter out low-confidence actions
return [a for a in actions if a['confidence'] > 0.8]
def _create_jira_issues(self, actions):
# Create Jira tickets (requires Jira integration)
issues = []
for action in actions:
issue = self.client.integrations.jira.create_issue(
summary=action['description'],
assignee=action.get('assigned_to'),
due_date=action.get('due_date')
)
issues.append(issue)
return issues
def _send_summary(self, issues):
summary = f"Created {len(issues)} Jira tickets from meeting"
self.client.integrations.slack.send_message(
channel='#team-updates',
message=summary
)
def _on_error(self, error):
# Error handling logic
logger.error(f"Orchestration error: {error}")
self.client.integrations.slack.send_message(
channel='#errors',
message=f"Workflow failed: {error}"
)Real-World Examples
Example 1: Contract Lifecycle
python
class ContractLifecycleOrchestrator:
"""Orchestrate full contract review and processing"""
def process(self, contract_url):
# 1. Analyze contract
analysis = self.analyze(contract_url)
# 2. Extract key information
extracted = self.extract_fields(analysis)
# 3. Legal review
legal_review = self.legal_review(analysis)
# 4. Financial analysis
financial = self.financial_analysis(analysis)
# 5. Risk assessment
risks = self.assess_risks(analysis)
# 6. Create action items
actions = self.create_actions(extracted, risks)
# 7. Notify stakeholders
self.notify_stakeholders(actions, legal_review)
return {
'extracted': extracted,
'legal_review': legal_review,
'financial': financial,
'risks': risks,
'actions': actions
}Example 2: Meeting-to-Project Workflow
python
class MeetingProjectOrchestrator:
"""Convert meetings into project tasks"""
def process(self, meeting_url, project_id):
# 1. Transcribe meeting
transcript = self.transcribe(meeting_url)
# 2. Generate summary
summary = self.summarize(transcript)
# 3. Extract action items
actions = self.extract_actions(transcript)
# 4. Create project tasks
tasks = self.create_tasks(actions, project_id)
# 5. Assign to team members
self.assign_tasks(tasks)
# 6. Send notifications
self.notify_team(tasks)
return tasksExample 3: Document Processing Pipeline
python
class DocumentProcessingPipeline:
"""Process documents through multiple agents"""
def process(self, documents):
results = []
for doc in documents:
# 1. Classify document
doc_type = self.classify(doc)
# 2. Extract content based on type
if doc_type == 'contract':
extracted = self.extract_contract(doc)
elif doc_type == 'report':
extracted = self.extract_report(doc)
else:
extracted = self.extract_generic(doc)
# 3. Summarize
summary = self.summarize(doc)
# 4. Store results
self.store(doc, extracted, summary)
results.append({
'document': doc,
'type': doc_type,
'extracted': extracted,
'summary': summary
})
return resultsError Handling & Retry Logic
python
class RobustOrchestrator:
def __init__(self, client, max_retries=3):
self.client = client
self.max_retries = max_retries
def execute_with_retry(self, task, *args, **kwargs):
"""Execute task with retry logic"""
for attempt in range(self.max_retries):
try:
return task(*args, **kwargs)
except RateLimitError as e:
if attempt == self.max_retries - 1:
raise
wait_time = e.retry_after
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
except ServerError as e:
if attempt == self.max_retries - 1:
raise
wait_time = (2 ** attempt) # Exponential backoff
print(f"Server error. Retrying in {wait_time}s...")
time.sleep(wait_time)
except Exception as e:
print(f"Error: {e}")
raiseMonitoring & Logging
python
class MonitoredOrchestrator:
def __init__(self, client, logger):
self.client = client
self.logger = logger
def process(self, input_data):
start_time = time.time()
try:
self.logger.info(f"Starting workflow: {input_data}")
# Process
result = self.do_work(input_data)
elapsed = time.time() - start_time
self.logger.info(f"Workflow completed in {elapsed}s", {
'status': 'success',
'elapsed_ms': elapsed * 1000
})
return result
except Exception as e:
elapsed = time.time() - start_time
self.logger.error(f"Workflow failed: {e}", {
'status': 'failed',
'elapsed_ms': elapsed * 1000,
'error': str(e)
})
raiseBest Practices
✅ Do
Design for failure
- Add retry logic
- Handle timeouts
- Rollback on errors
Monitor progress
- Log each step
- Track metrics
- Alert on failures
Optimize parallelization
- Run independent tasks in parallel
- Limit concurrent requests
- Respect rate limits
Manage state
- Store intermediate results
- Enable resume on failure
- Track workflow progress
❌ Don't
Don't cascade failures
- Handle errors locally
- Don't let one failure break everything
- Implement fallbacks
Don't ignore rate limits
- Add delays between parallel tasks
- Respect API rate limits
- Implement backoff
Don't hardcode configuration
- Use environment variables
- Make workflows configurable
- Support different scenarios