AI Agent Orchestration System Development
An agent orchestrator is a management system that coordinates the work of multiple specialized AI agents: distributes tasks, manages data flow between agents, tracks execution status, handles errors, and ensures consistency of results. This is the top level of multi-agent architecture.
Orchestrator Responsibilities
Task Decomposition: breaking down complex tasks into subtasks for specialized agents.
Agent Selection: selecting the appropriate agent for each subtask based on capabilities and current load.
State Management: tracking the state of each agent and overall progress.
Error Handling: handling agent failures, retry logic, fallback strategies.
Result Aggregation: assembling results from different agents into a single response.
Implementing an Orchestrator with LangGraph
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from langchain_openai import ChatOpenAI
import operator
import json
class OrchestratorState(TypedDict):
user_request: str
task_plan: list[dict] # [{task_id, description, agent, status, result}]
current_task_index: int
agent_results: Annotated[dict, lambda a, b: {**a, **b}]
final_response: str
error_count: int
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Agent registry
AGENT_REGISTRY = {
"researcher": ResearcherAgent(),
"analyst": AnalystAgent(),
"writer": WriterAgent(),
"sql_agent": SQLAgent(),
"code_interpreter": CodeInterpreterAgent(),
"file_processor": FileProcessorAgent(),
}
def plan_tasks(state: OrchestratorState) -> OrchestratorState:
"""Orchestrator breaks down task into subtasks and assigns agents"""
available_agents = list(AGENT_REGISTRY.keys())
response = llm.invoke(f"""
Break down the following task into subtasks and assign an agent to each.
Available agents: {available_agents}
Task: {state["user_request"]}
Return a JSON list:
[{{"task_id": "t1", "description": "...", "agent": "researcher", "dependencies": []}}]
Dependencies: list of task_ids that must complete before this task.
""")
task_plan = json.loads(response.content)
for task in task_plan:
task["status"] = "pending"
task["result"] = None
return {**state, "task_plan": task_plan, "current_task_index": 0}
def execute_next_task(state: OrchestratorState) -> OrchestratorState:
"""Executes the next ready task"""
task_plan = state["task_plan"].copy()
# Find next task with all dependencies completed
next_task = None
for task in task_plan:
if task["status"] == "pending":
deps_completed = all(
any(t["task_id"] == dep and t["status"] == "completed"
for t in task_plan)
for dep in task.get("dependencies", [])
)
if deps_completed:
next_task = task
break
if not next_task:
return {**state, "current_task_index": -1} # All tasks completed
# Execute task through appropriate agent
agent = AGENT_REGISTRY.get(next_task["agent"])
if not agent:
next_task["status"] = "failed"
next_task["result"] = f"Agent {next_task['agent']} not found"
else:
# Pass dependency results as context
dependency_results = {
dep: state["agent_results"].get(dep)
for dep in next_task.get("dependencies", [])
}
try:
result = agent.execute(
task=next_task["description"],
context=dependency_results,
)
next_task["status"] = "completed"
next_task["result"] = result
except Exception as e:
next_task["status"] = "failed"
next_task["result"] = str(e)
# Update plan
updated_plan = [
task if task["task_id"] != next_task["task_id"] else next_task
for task in task_plan
]
return {
**state,
"task_plan": updated_plan,
"agent_results": {next_task["task_id"]: next_task["result"]},
}
def should_continue(state: OrchestratorState) -> str:
"""Determines the next orchestrator step"""
pending = [t for t in state["task_plan"] if t["status"] == "pending"]
failed = [t for t in state["task_plan"] if t["status"] == "failed"]
if failed and state["error_count"] >= 3:
return "finalize_with_errors"
if not pending:
return "aggregate_results"
return "execute_next"
def aggregate_results(state: OrchestratorState) -> OrchestratorState:
"""Aggregates results from all agents into final response"""
all_results = {t["task_id"]: t["result"] for t in state["task_plan"]}
final = llm.invoke(f"""
Based on results from different agents, compose the final response.
Original request: {state["user_request"]}
Results: {json.dumps(all_results, ensure_ascii=False)}
""").content
return {**state, "final_response": final}
# Build the graph
graph = StateGraph(OrchestratorState)
graph.add_node("plan", plan_tasks)
graph.add_node("execute_next", execute_next_task)
graph.add_node("aggregate_results", aggregate_results)
graph.set_entry_point("plan")
graph.add_edge("plan", "execute_next")
graph.add_conditional_edges("execute_next", should_continue, {
"execute_next": "execute_next",
"aggregate_results": "aggregate_results",
"finalize_with_errors": "aggregate_results",
})
graph.add_edge("aggregate_results", END)
orchestrator = graph.compile(checkpointer=MemorySaver())
Parallel Task Execution
import asyncio
async def execute_parallel_tasks(tasks_batch: list[dict]) -> list[dict]:
"""Parallel execution of independent tasks"""
coroutines = []
for task in tasks_batch:
agent = AGENT_REGISTRY.get(task["agent"])
if agent:
coroutines.append(asyncio.to_thread(agent.execute, task=task["description"]))
results = await asyncio.gather(*coroutines, return_exceptions=True)
for task, result in zip(tasks_batch, results):
if isinstance(result, Exception):
task["status"] = "failed"
task["result"] = str(result)
else:
task["status"] = "completed"
task["result"] = result
return tasks_batch
Practical Case Study: Due Diligence Orchestrator
Task: automated company verification for M&A. Parallel work of 5 agents:
- Financial Agent: analysis of 3 years of financial reports
- Legal Agent: verification of lawsuits, restrictions
- HR Agent: personnel structure, turnover
- Market Agent: market position, competitors
- Risk Agent: synthesis of risks from all sources
Execution graph:
- t1 (financial), t2 (legal), t3 (hr), t4 (market) — in parallel
- t5 (risk) — depends on t1, t2, t3, t4
- t6 (final_report) — depends on t5
Results:
- Due diligence time: 4 weeks → 3 days
- Coverage of aspects: 78% → 94%
- Cost per due diligence: -71%
Monitoring and Tracing
import mlflow
def log_orchestration_run(state: OrchestratorState):
with mlflow.start_run():
mlflow.log_metrics({
"total_tasks": len(state["task_plan"]),
"completed_tasks": sum(1 for t in state["task_plan"] if t["status"] == "completed"),
"failed_tasks": sum(1 for t in state["task_plan"] if t["status"] == "failed"),
})
mlflow.log_text(json.dumps(state["task_plan"], indent=2), "task_execution_log.json")
Timeline
- Orchestrator design: 1–2 weeks
- Implementation of basic agents (3–5): 3–5 weeks
- Parallel execution integration: 1 week
- Error handling and monitoring: 1–2 weeks
- Total: 6–10 weeks







