Getting Started
Orchestration is about turning a fragile chain of agent steps into a reliable production pipeline. When your agent calls three APIs, parses results, and makes decisions, any step can fail. Orchestration gives you the patterns to handle those failures gracefully and keep your system observable.
The core idea is treating your agent workflow as a Directed Acyclic Graph (DAG) where each node is an idempotent operation with defined inputs, outputs, and failure modes. This structure lets you retry individual steps, resume from checkpoints, and reason about system behavior.
Key Concepts
DAG-based workflows decompose your agent into discrete steps with explicit dependencies. Each step should be idempotent — running it twice with the same input produces the same result. This property is what makes retries safe:
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(wait=wait_exponential(multiplier=1, max=60), stop=stop_after_attempt(5))
def enrich_entity(entity: dict) -> dict:
"""Call external API with automatic retry on failure."""
response = httpx.post("https://api.example.com/enrich", json=entity)
response.raise_for_status()
return response.json()
Circuit breakers prevent your workflow from hammering a failing service. After N consecutive failures, the circuit "opens" and subsequent calls fail immediately without making the request:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 3, reset_timeout: float = 60.0):
self.failures = 0
self.threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = 0.0
def call(self, func, *args, **kwargs):
if self.failures >= self.threshold:
elapsed = time.time() - self.last_failure_time
if elapsed < self.reset_timeout:
raise CircuitOpenError("Circuit is open")
self.failures = 0 # Try to reset
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
raise
Event-driven triggers let workflows start in response to external events — a new file in S3, a webhook, a cron schedule, or a message on a queue. This decouples your agent from the triggering system.
State management across long-running workflows means persisting intermediate results. Save checkpoints after each successful step so you can resume from the point of failure rather than restarting the entire pipeline.
Hands-On Practice
Build a three-stage document processing pipeline: ingest, enrich, summarize. Add retry logic to the enrich stage, a circuit breaker for the external API, and checkpoint state to a JSON file between stages. Test it by simulating API failures and verifying that your pipeline resumes correctly from the last checkpoint.