Zum Inhalt springen
>_<
AI EngineeringWiki

Task Delegation Pattern

Wie ein Orchestrator Tasks an spezialisierte Agenten verteilt.

📋 Auf einen Blick

Task Delegation trennt Orchestrierung von Ausführung. Ein Orchestrator klassifiziert Anfragen (Intent), routet sie an spezialisierte Agenten (Coder, Researcher, Reviewer) und aggregiert die Ergebnisse. Kernkonzepte: Intent Classification, Routing Matrix, Priority Queue mit Deadlines, Result Aggregation. Ohne Delegation wird jeder Agent zum Flaschenhals.

Das Problem

Ein einzelner Agent kann nicht alles besser als spezialisierte Tools. Du brauchst ein System, das den richtigen Agenten für die richtige Aufgabe auswählt.

Task Delegation Pattern — Orchestrator verteilt an spezialisierte Agenten
Task Delegation: Wie der Orchestrator Aufgaben an spezialisierte Agenten verteilt
Diagramm wird geladen...

Architektur

User Request
     |
     v
[Orchestrator Agent]
     |
     +---> [Research Agent] ----> Web Search, Docs
     |
     +---> [Coder Agent] ----> Code Generation
     |
     +---> [Review Agent] ----> PR Review, Tests
     |
     v
  Final Response

Implementierung

1. Intent Classification

Der Orchestrator klassifiziert die Benutzeranfrage und leitet sie an den passenden Agenten weiter.

# Intent Classification mit Ollama
from langchain.llms import Ollama

llm = Ollama(model="llama3.2")

def classify_intent(user_input: str) -> str:
    prompt = f"""
    Classify this request into one of: code, research, review, deploy, qa
    
    Request: {user_input}
    
    Return only the category, nothing else.
    """
    return llm(prompt).strip().lower()

2. Routing Matrix

const routes = {
  'code-generation': coderAgent,
  'research': researchAgent,
  'review': reviewAgent,
  'deployment': deployAgent,
  'question': qaAgent,
}

# Mit Ollama:
function route(task) {
  const intent = ollama.chat('llama3.2', task.prompt)
  return routes[intent] || qaAgent
}

3. Priority Queue

Für mehrere gleichzeitige Tasks: Prioritäten setzen (1 = highest). Deadline-Tracking verhindert, dass Tasks ewig warten.

# Priority Queue mit Python
import heapq
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class Task:
    priority: int  # 1 = highest
    deadline: datetime
    agent: str
    payload: dict
    
    def __lt__(self, other):
        return (self.priority, self.deadline) < (other.priority, other.deadline)

task_queue = []

def enqueue(task):
    heapq.heappush(task_queue, task)
    
def dequeue():
    task = heapq.heappop(task_queue)
    if datetime.now() > task.deadline:
        raise TimeoutError(f"Task {task.agent} timed out")
    return task

# Beispiel: Tasks einreihen
enqueue(Task(1, datetime.now() + timedelta(minutes=5), "coder", {"code": "..."}))
enqueue(Task(2, datetime.now() + timedelta(minutes=10), "reviewer", {"pr": "..."}))

4. Result Aggregation

Der Orchestrator sammelt die Ergebnisse aller Sub-Agenten und synthetisiert eine finale Antwort.

# Result Aggregation
async def aggregate_results(sub_tasks: list) -> str:
    results = await asyncio.gather(
        *[execute_agent(task) for task in sub_tasks]
    )
    
    # Finale Synthese
    synthesis_prompt = f"""
    Du hast folgende Teilergebnisse:
    {chr(10).join(results)}
    
    Erstelle eine kohärente finale Antwort.
    """
    return llama3.2(synthesis_prompt)

Wichtige Aspekte

  • Timeout: Jeder Sub-Agent braucht ein Max-Timeout
  • Retry: Bei Fehlern max. 2x wiederholen
  • Fallback: Was tun, wenn alle Agenten fehlschlagen?
  • Cost Control: Budget-Limits pro Task

Fallback-Strategie

Wenn ein Agent fehlschlägt, braucht der Orchestrator einen Plan B. Hier ein bewährtes Pattern mit Retry und Graceful Degradation:

# Fallback mit Retry und Degradation
import asyncio
from typing import Optional

async def execute_with_fallback(
    task: Task,
    max_retries: int = 2,
    timeout: int = 30
) -> Optional[str]:
    """Execute task with retry and fallback logic"""

    for attempt in range(max_retries + 1):
        try:
            result = await asyncio.wait_for(
                execute_agent(task),
                timeout=timeout
            )
            return result

        except asyncio.TimeoutError:
            print(f"Attempt {attempt + 1}: Timeout after {timeout}s")

        except Exception as e:
            print(f"Attempt {attempt + 1}: Error: {e}")

    # Alle Retries fehlgeschlagen — Fallback
    return await fallback_handler(task)

async def fallback_handler(task: Task) -> str:
    """Graceful degradation wenn Agent nicht verfügbar"""
    return f"Task '{task.agent}' konnte nicht ausgeführt werden. " \
           f"Bitte manuell prüfen: {task.payload}"
⚠️ Endlos-Schleifen vermeiden

Ohne Timeout und Retry-Limit kann ein fehlgeschlagener Agent den gesamten Orchestrator blockieren. Setze immer ein Maximum für Retries (2-3) und ein Timeout pro Agent (30-60 Sekunden).

Quellen

War dieser Artikel hilfreich?

Nächster Schritt: vom Wissen in die Umsetzung

Wenn du mehr willst als Theorie: Setups, Workflows und Vorlagen aus dem echten Betrieb für Teams, die lokale und dokumentierte AI-Systeme wollen.

Warum AI Engineering
  • Lokal und self-hosted gedacht
  • Dokumentiert und auditierbar
  • Aus eigener Runtime entwickelt
  • Made in Austria
Kein Ersatz für Rechtsberatung.