Skip to content

Agent Orchestration & Workflows

Orchestrate multiple agents to build sophisticated automation workflows.

What is Agent Orchestration?

Orchestration is coordinating multiple agents to work together on complex tasks:

User Input

[Orchestrator Agent]
    ├─→ [Document Analysis Agent] → Extract key info
    ├─→ [Meeting Summary Agent]   → Summarize meetings
    └─→ [Task Creation Agent]     → Create follow-ups

Coordinated Results

Workflow Architecture

Sequential Workflow

Execute agents one after another:

Step 1: Analyze Document

Step 2: Extract Action Items

Step 3: Create Tasks

Complete

Example:

python
# Step 1: Analyze document
analysis = client.documents.analyze(
    document_url='https://example.com/contract.pdf',
    analysis_type='contract_review'
)

# Step 2: Extract obligations
obligations = client.documents.chat(
    document_id=analysis.id,
    question='What are the main obligations?'
)

# Step 3: Create tasks from obligations
for obligation in obligations:
    client.tasks.create(
        title=f'Fulfill: {obligation}',
        description=obligation,
        priority='high'
    )

Parallel Workflow

Execute multiple agents simultaneously:

      Analysis Agent (Process document)

[Orchestrator]

      Summary Agent (Generate summary)
      ↓           ↓
    Agent 1   Agent 2   Agent 3

Example:

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def parallel_analysis(document_url):
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()

    # Run all analyses in parallel
    results = await asyncio.gather(
        loop.run_in_executor(executor, lambda: client.documents.analyze(
            document_url=document_url,
            analysis_type='contract_review'
        )),
        loop.run_in_executor(executor, lambda: client.documents.analyze(
            document_url=document_url,
            analysis_type='financial_analysis'
        )),
        loop.run_in_executor(executor, lambda: client.documents.analyze(
            document_url=document_url,
            analysis_type='legal_review'
        ))
    )

    return {
        'contract_review': results[0],
        'financial_analysis': results[1],
        'legal_review': results[2]
    }

# Usage
analysis = await parallel_analysis('https://example.com/contract.pdf')

Conditional Workflow

Branch based on conditions:

Analyze Document

    [Is it a contract?]
       /      \
      YES      NO
      ↓        ↓
 Extract    Extract
 Legal     Financial
 Terms     Info

Example:

python
# Analyze document
analysis = client.documents.analyze(
    document_url=document_url,
    analysis_type='document_review'
)

# Branch based on document type
if analysis.document_type == 'contract':
    # Handle as contract
    obligations = client.documents.chat(
        document_id=analysis.id,
        question='What are the legal obligations?'
    )
elif analysis.document_type == 'financial':
    # Handle as financial document
    summary = client.documents.chat(
        document_id=analysis.id,
        question='What are the key financial metrics?'
    )

Building Orchestrators

Basic Orchestrator

python
class WorkflowOrchestrator:
    def __init__(self, client):
        self.client = client

    def process_contract(self, document_url):
        """Process contract through multiple agents"""

        # Agent 1: Analyze
        print("Analyzing contract...")
        analysis = self.client.documents.analyze(
            document_url=document_url,
            analysis_type='contract_review'
        )

        # Agent 2: Extract obligations
        print("Extracting obligations...")
        obligations = self.client.documents.chat(
            document_id=analysis.id,
            question='What are the key obligations?'
        )

        # Agent 3: Create tasks
        print("Creating tasks...")
        tasks = []
        for obligation in obligations['items']:
            task = self.client.tasks.create(
                title=obligation,
                priority='high',
                related_document_id=analysis.id
            )
            tasks.append(task)

        return {
            'analysis': analysis,
            'obligations': obligations,
            'tasks': tasks
        }

# Usage
orchestrator = WorkflowOrchestrator(client)
result = orchestrator.process_contract('https://example.com/contract.pdf')

Advanced Orchestrator with State Management

python
class AdvancedOrchestrator:
    def __init__(self, client):
        self.client = client
        self.state = {}

    def process_meeting_to_jira(self, audio_url):
        """Process meeting and create Jira tickets"""

        try:
            # Step 1: Transcribe meeting
            print("Step 1: Transcribing meeting...")
            transcript = self._transcribe(audio_url)
            self.state['transcript_id'] = transcript.id

            # Step 2: Extract action items
            print("Step 2: Extracting action items...")
            actions = self._extract_actions(transcript.id)
            self.state['actions'] = actions

            # Step 3: Validate actions
            print("Step 3: Validating action items...")
            validated = self._validate(actions)
            self.state['validated_actions'] = validated

            # Step 4: Create Jira issues
            print("Step 4: Creating Jira tickets...")
            issues = self._create_jira_issues(validated)
            self.state['jira_issues'] = issues

            # Step 5: Send summary
            print("Step 5: Sending summary...")
            self._send_summary(issues)

            return {
                'status': 'success',
                'transcript_id': transcript.id,
                'action_count': len(validated),
                'jira_issues': issues
            }

        except Exception as e:
            print(f"Error: {e}")
            self._on_error(e)
            return {
                'status': 'failed',
                'error': str(e),
                'state': self.state
            }

    def _transcribe(self, audio_url):
        return self.client.transcribe(
            audio_url=audio_url,
            language='en'
        )

    def _extract_actions(self, transcript_id):
        return self.client.meetings.action_items(
            transcript_id=transcript_id,
            assign_to_speakers=True
        )

    def _validate(self, actions):
        # Filter out low-confidence actions
        return [a for a in actions if a['confidence'] > 0.8]

    def _create_jira_issues(self, actions):
        # Create Jira tickets (requires Jira integration)
        issues = []
        for action in actions:
            issue = self.client.integrations.jira.create_issue(
                summary=action['description'],
                assignee=action.get('assigned_to'),
                due_date=action.get('due_date')
            )
            issues.append(issue)
        return issues

    def _send_summary(self, issues):
        summary = f"Created {len(issues)} Jira tickets from meeting"
        self.client.integrations.slack.send_message(
            channel='#team-updates',
            message=summary
        )

    def _on_error(self, error):
        # Error handling logic
        logger.error(f"Orchestration error: {error}")
        self.client.integrations.slack.send_message(
            channel='#errors',
            message=f"Workflow failed: {error}"
        )

Real-World Examples

Example 1: Contract Lifecycle

python
class ContractLifecycleOrchestrator:
    """Orchestrate full contract review and processing"""

    def process(self, contract_url):
        # 1. Analyze contract
        analysis = self.analyze(contract_url)

        # 2. Extract key information
        extracted = self.extract_fields(analysis)

        # 3. Legal review
        legal_review = self.legal_review(analysis)

        # 4. Financial analysis
        financial = self.financial_analysis(analysis)

        # 5. Risk assessment
        risks = self.assess_risks(analysis)

        # 6. Create action items
        actions = self.create_actions(extracted, risks)

        # 7. Notify stakeholders
        self.notify_stakeholders(actions, legal_review)

        return {
            'extracted': extracted,
            'legal_review': legal_review,
            'financial': financial,
            'risks': risks,
            'actions': actions
        }

Example 2: Meeting-to-Project Workflow

python
class MeetingProjectOrchestrator:
    """Convert meetings into project tasks"""

    def process(self, meeting_url, project_id):
        # 1. Transcribe meeting
        transcript = self.transcribe(meeting_url)

        # 2. Generate summary
        summary = self.summarize(transcript)

        # 3. Extract action items
        actions = self.extract_actions(transcript)

        # 4. Create project tasks
        tasks = self.create_tasks(actions, project_id)

        # 5. Assign to team members
        self.assign_tasks(tasks)

        # 6. Send notifications
        self.notify_team(tasks)

        return tasks

Example 3: Document Processing Pipeline

python
class DocumentProcessingPipeline:
    """Process documents through multiple agents"""

    def process(self, documents):
        results = []

        for doc in documents:
            # 1. Classify document
            doc_type = self.classify(doc)

            # 2. Extract content based on type
            if doc_type == 'contract':
                extracted = self.extract_contract(doc)
            elif doc_type == 'report':
                extracted = self.extract_report(doc)
            else:
                extracted = self.extract_generic(doc)

            # 3. Summarize
            summary = self.summarize(doc)

            # 4. Store results
            self.store(doc, extracted, summary)

            results.append({
                'document': doc,
                'type': doc_type,
                'extracted': extracted,
                'summary': summary
            })

        return results

Error Handling & Retry Logic

python
class RobustOrchestrator:
    def __init__(self, client, max_retries=3):
        self.client = client
        self.max_retries = max_retries

    def execute_with_retry(self, task, *args, **kwargs):
        """Execute task with retry logic"""

        for attempt in range(self.max_retries):
            try:
                return task(*args, **kwargs)

            except RateLimitError as e:
                if attempt == self.max_retries - 1:
                    raise
                wait_time = e.retry_after
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)

            except ServerError as e:
                if attempt == self.max_retries - 1:
                    raise
                wait_time = (2 ** attempt)  # Exponential backoff
                print(f"Server error. Retrying in {wait_time}s...")
                time.sleep(wait_time)

            except Exception as e:
                print(f"Error: {e}")
                raise

Monitoring & Logging

python
class MonitoredOrchestrator:
    def __init__(self, client, logger):
        self.client = client
        self.logger = logger

    def process(self, input_data):
        start_time = time.time()

        try:
            self.logger.info(f"Starting workflow: {input_data}")

            # Process
            result = self.do_work(input_data)

            elapsed = time.time() - start_time
            self.logger.info(f"Workflow completed in {elapsed}s", {
                'status': 'success',
                'elapsed_ms': elapsed * 1000
            })

            return result

        except Exception as e:
            elapsed = time.time() - start_time
            self.logger.error(f"Workflow failed: {e}", {
                'status': 'failed',
                'elapsed_ms': elapsed * 1000,
                'error': str(e)
            })
            raise

Best Practices

✅ Do

  1. Design for failure

    • Add retry logic
    • Handle timeouts
    • Rollback on errors
  2. Monitor progress

    • Log each step
    • Track metrics
    • Alert on failures
  3. Optimize parallelization

    • Run independent tasks in parallel
    • Limit concurrent requests
    • Respect rate limits
  4. Manage state

    • Store intermediate results
    • Enable resume on failure
    • Track workflow progress

❌ Don't

  1. Don't cascade failures

    • Handle errors locally
    • Don't let one failure break everything
    • Implement fallbacks
  2. Don't ignore rate limits

    • Add delays between parallel tasks
    • Respect API rate limits
    • Implement backoff
  3. Don't hardcode configuration

    • Use environment variables
    • Make workflows configurable
    • Support different scenarios

Next Steps

Released under the MIT License.