Skip to content
Phase 3: Production PatternsStep 9 of 14Advanced2 weeksCHECKPOINT

Orchestration & Workflows

Build reliable, observable agent pipelines with production-grade error handling.

DAG designError handlingRetriesConditional logicState managementEvent triggers

Getting Started

Orchestration is about turning a fragile chain of agent steps into a reliable production pipeline. When your agent calls three APIs, parses results, and makes decisions, any step can fail. Orchestration gives you the patterns to handle those failures gracefully and keep your system observable.

The core idea is treating your agent workflow as a Directed Acyclic Graph (DAG) where each node is an idempotent operation with defined inputs, outputs, and failure modes. This structure lets you retry individual steps, resume from checkpoints, and reason about system behavior.

Key Concepts

DAG-based workflows decompose your agent into discrete steps with explicit dependencies. Each step should be idempotent — running it twice with the same input produces the same result. This property is what makes retries safe:

from tenacity import retry, wait_exponential, stop_after_attempt

@retry(wait=wait_exponential(multiplier=1, max=60), stop=stop_after_attempt(5))
def enrich_entity(entity: dict) -> dict:
    """Call external API with automatic retry on failure."""
    response = httpx.post("https://api.example.com/enrich", json=entity)
    response.raise_for_status()
    return response.json()

Circuit breakers prevent your workflow from hammering a failing service. After N consecutive failures, the circuit "opens" and subsequent calls fail immediately without making the request:

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 3, reset_timeout: float = 60.0):
        self.failures = 0
        self.threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure_time = 0.0

    def call(self, func, *args, **kwargs):
        if self.failures >= self.threshold:
            elapsed = time.time() - self.last_failure_time
            if elapsed < self.reset_timeout:
                raise CircuitOpenError("Circuit is open")
            self.failures = 0  # Try to reset

        try:
            result = func(*args, **kwargs)
            self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            raise

Event-driven triggers let workflows start in response to external events — a new file in S3, a webhook, a cron schedule, or a message on a queue. This decouples your agent from the triggering system.

State management across long-running workflows means persisting intermediate results. Save checkpoints after each successful step so you can resume from the point of failure rather than restarting the entire pipeline.

Hands-On Practice

Build a three-stage document processing pipeline: ingest, enrich, summarize. Add retry logic to the enrich stage, a circuit breaker for the external API, and checkpoint state to a JSON file between stages. Test it by simulating API failures and verifying that your pipeline resumes correctly from the last checkpoint.

Exercises

Build a Production Workflow with Error Recovery

Create a DAG-based workflow that ingests a document, extracts key entities, enriches them via an external API, and generates a summary. Implement retry logic with exponential backoff and a circuit breaker that stops calling a failing API after 3 consecutive errors.

Knowledge Check

What is the primary advantage of a circuit breaker pattern in agent workflows?

Milestone Project

Production workflow with error handling, logging, and automatic recovery