Task Delegation Pattern
Wie ein Orchestrator Tasks an spezialisierte Agenten verteilt.
Task Delegation trennt Orchestrierung von Ausführung. Ein Orchestrator klassifiziert Anfragen (Intent), routet sie an spezialisierte Agenten (Coder, Researcher, Reviewer) und aggregiert die Ergebnisse. Kernkonzepte: Intent Classification, Routing Matrix, Priority Queue mit Deadlines, Result Aggregation. Ohne Delegation wird jeder Agent zum Flaschenhals.
Das Problem
Ein einzelner Agent kann nicht alles besser als spezialisierte Tools. Du brauchst ein System, das den richtigen Agenten für die richtige Aufgabe auswählt.

Architektur
User Request
|
v
[Orchestrator Agent]
|
+---> [Research Agent] ----> Web Search, Docs
|
+---> [Coder Agent] ----> Code Generation
|
+---> [Review Agent] ----> PR Review, Tests
|
v
Final ResponseImplementierung
1. Intent Classification
Der Orchestrator klassifiziert die Benutzeranfrage und leitet sie an den passenden Agenten weiter.
# Intent Classification mit Ollama
from langchain.llms import Ollama
llm = Ollama(model="llama3.2")
def classify_intent(user_input: str) -> str:
prompt = f"""
Classify this request into one of: code, research, review, deploy, qa
Request: {user_input}
Return only the category, nothing else.
"""
return llm(prompt).strip().lower()2. Routing Matrix
const routes = {
'code-generation': coderAgent,
'research': researchAgent,
'review': reviewAgent,
'deployment': deployAgent,
'question': qaAgent,
}
# Mit Ollama:
function route(task) {
const intent = ollama.chat('llama3.2', task.prompt)
return routes[intent] || qaAgent
}3. Priority Queue
Für mehrere gleichzeitige Tasks: Prioritäten setzen (1 = highest). Deadline-Tracking verhindert, dass Tasks ewig warten.
# Priority Queue mit Python
import heapq
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class Task:
priority: int # 1 = highest
deadline: datetime
agent: str
payload: dict
def __lt__(self, other):
return (self.priority, self.deadline) < (other.priority, other.deadline)
task_queue = []
def enqueue(task):
heapq.heappush(task_queue, task)
def dequeue():
task = heapq.heappop(task_queue)
if datetime.now() > task.deadline:
raise TimeoutError(f"Task {task.agent} timed out")
return task
# Beispiel: Tasks einreihen
enqueue(Task(1, datetime.now() + timedelta(minutes=5), "coder", {"code": "..."}))
enqueue(Task(2, datetime.now() + timedelta(minutes=10), "reviewer", {"pr": "..."}))4. Result Aggregation
Der Orchestrator sammelt die Ergebnisse aller Sub-Agenten und synthetisiert eine finale Antwort.
# Result Aggregation
async def aggregate_results(sub_tasks: list) -> str:
results = await asyncio.gather(
*[execute_agent(task) for task in sub_tasks]
)
# Finale Synthese
synthesis_prompt = f"""
Du hast folgende Teilergebnisse:
{chr(10).join(results)}
Erstelle eine kohärente finale Antwort.
"""
return llama3.2(synthesis_prompt)Wichtige Aspekte
- Timeout: Jeder Sub-Agent braucht ein Max-Timeout
- Retry: Bei Fehlern max. 2x wiederholen
- Fallback: Was tun, wenn alle Agenten fehlschlagen?
- Cost Control: Budget-Limits pro Task
Fallback-Strategie
Wenn ein Agent fehlschlägt, braucht der Orchestrator einen Plan B. Hier ein bewährtes Pattern mit Retry und Graceful Degradation:
# Fallback mit Retry und Degradation
import asyncio
from typing import Optional
async def execute_with_fallback(
task: Task,
max_retries: int = 2,
timeout: int = 30
) -> Optional[str]:
"""Execute task with retry and fallback logic"""
for attempt in range(max_retries + 1):
try:
result = await asyncio.wait_for(
execute_agent(task),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1}: Timeout after {timeout}s")
except Exception as e:
print(f"Attempt {attempt + 1}: Error: {e}")
# Alle Retries fehlgeschlagen — Fallback
return await fallback_handler(task)
async def fallback_handler(task: Task) -> str:
"""Graceful degradation wenn Agent nicht verfügbar"""
return f"Task '{task.agent}' konnte nicht ausgeführt werden. " \
f"Bitte manuell prüfen: {task.payload}"Ohne Timeout und Retry-Limit kann ein fehlgeschlagener Agent den gesamten Orchestrator blockieren. Setze immer ein Maximum für Retries (2-3) und ein Timeout pro Agent (30-60 Sekunden).
Quellen
- AutoGen: Multi-Agent Conversations (ArXiv) — Microsoft Research zu Multi-Agent Orchestrierung
- Python asyncio Dokumentation — Grundlage für async Task-Verarbeitung
- Python heapq — Priority Queue Implementierung
- Anthropic: Building Effective Agents — Best Practices für Agent-Architekturen
War dieser Artikel hilfreich?
Nächster Schritt: vom Wissen in die Umsetzung
Wenn du mehr willst als Theorie: Setups, Workflows und Vorlagen aus dem echten Betrieb für Teams, die lokale und dokumentierte AI-Systeme wollen.
- Lokal und self-hosted gedacht
- Dokumentiert und auditierbar
- Aus eigener Runtime entwickelt
- Made in Austria