Zum Inhalt

Kafka — Architektur, Topics & HowTo

Sovereign MoE — Kafka-Dokumentation
Stand: 2026-03-29


1. Überblick & Architekturentscheidung

Warum Kafka?

Vor Kafka erledigten alle Hintergrundoperationen (GraphRAG-Ingest, Audit-Logging) als asyncio.create_task() im Orchestrator-Prozess:

HTTP Response
    └── asyncio.create_task(graph_manager.extract_and_ingest(...))  # fire & forget

Probleme dieses Ansatzes: - Bei Neustart des Containers gehen laufende Ingest-Tasks verloren - Kein Retry bei Fehlern (Neo4j kurz nicht erreichbar → Daten verloren) - Keine Entkopplung: Fehler im Ingest können die Haupt-Eventloop beeinflussen - Keine Möglichkeit, externe Consumer (Analytics, Monitoring) anzubinden

Mit Kafka:

HTTP Response
    └── kafka_producer.send("moe.ingest", payload)   # persistent, entkoppelt
         └── [Kafka] ──► Consumer: extract_and_ingest(...)
  • Persistenz: Nachrichten überleben Container-Neustarts (7 Tage Retention)
  • Entkopplung: Ingest-Fehler beeinflussen nicht den HTTP-Response-Pfad
  • Erweiterbarkeit: Externe Consumer können jederzeit Topics abonnieren
  • Observierbarkeit: Alle Events sind in Topics nachverfolgbar

Architektur im System

graph LR
    subgraph ORCH["langgraph-orchestrator"]
        MERGER_NODE["merger_node"]
        FB_EP["/v1/feedback"]
        CONSUMER["_kafka_consumer_loop"]
    end

    subgraph KAFKA_BROKER["moe-kafka  ·  Port 9092\nKRaft Mode · Retention 7d / 512 MB"]
        INGEST["moe.ingest"]
        REQUESTS["moe.requests"]
        FEEDBACK["moe.feedback"]
    end

    NEO4J[("Neo4j\nKnowledge Graph")]

    MERGER_NODE -->|"_kafka_publish"| INGEST
    MERGER_NODE -->|"_kafka_publish"| REQUESTS
    FB_EP -->|"_kafka_publish"| FEEDBACK
    INGEST --> CONSUMER
    REQUESTS --> CONSUMER
    CONSUMER -->|"GraphRAG Ingest"| NEO4J

2. Setup & Konfiguration

Docker Service

# docker-compose.yml
moe-kafka:
  image: confluentinc/cp-kafka:7.7.0
  container_name: moe-kafka
  restart: always
  ports:
    - "9092:9092"
  environment:
    CLUSTER_ID: "moe-kafka-cluster-01"
    KAFKA_NODE_ID: 1
    KAFKA_PROCESS_ROLES: "broker,controller"
    KAFKA_CONTROLLER_QUORUM_VOTERS: "1@moe-kafka:9093"
    KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093"
    KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://moe-kafka:9092"
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_LOG_RETENTION_HOURS: 168
    KAFKA_LOG_RETENTION_BYTES: 536870912
    KAFKA_LOG_DIRS: "/var/lib/kafka/data"
  volumes:
    - /opt/moe-infra/kafka-data:/var/lib/kafka/data

Umgebungsvariable

# In .env oder docker-compose environment des Orchestrators
KAFKA_URL=kafka://moe-kafka:9092

Der Orchestrator parst daraus den Bootstrap-Server: moe-kafka:9092

Python-Abhängigkeit

aiokafka>=0.11.0   # requirements.txt

3. Topics

Topic Erzeuger Consumer Zweck
moe.ingest merger_node _kafka_consumer_loop GraphRAG Tripel-Extraktion aus Antworten
moe.requests merger_node _kafka_consumer_loop (Logging) Vollständiger Audit-Log aller Anfragen
moe.feedback POST /v1/feedback — (noch kein aktiver Consumer) Feedback-Events für externe Auswertung

Topic-Parameter (auto-created)

Alle Topics werden automatisch erstellt (KAFKA_AUTO_CREATE_TOPICS_ENABLE=true) mit: - Partitionen: 1 (Single-Broker, kein Clustering nötig) - Replikation: 1 - Retention: 7 Tage oder 512 MB (je nachdem was zuerst erreicht wird)


4. Producer

Datei: main.py_kafka_publish()

async def _kafka_publish(topic: str, payload: dict) -> None:
    if kafka_producer is None:
        return                          # Kafka nicht verfügbar → stille Rückgabe
    data = json.dumps(payload).encode()
    await kafka_producer.send_and_wait(topic, data)

Der Producer wird beim Start in _init_kafka() initialisiert (12 Versuche, Backoff 5–60s):

producer = AIOKafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP,
    value_serializer=lambda v: v,       # bytes, bereits von _kafka_publish kodiert
)

Wann wird publiziert:

Zeitpunkt Topic Auslöser
Nach jeder Merger-Antwort (> 150 Zeichen) moe.ingest merger_node
Nach jeder Merger-Antwort moe.requests merger_node
Bei Cache-Hit moe.requests merger_node
Bei POST /v1/feedback moe.feedback Feedback-Endpoint

5. Consumer

Datei: main.py_kafka_consumer_loop()

Der Consumer läuft als permanenter asyncio-Background-Task, gestartet in lifespan.

consumer = AIOKafkaConsumer(
    "moe.ingest",
    "moe.requests",
    bootstrap_servers=KAFKA_BOOTSTRAP,
    group_id="moe-worker",
    auto_offset_reset="earliest",       # beim ersten Start von Anfang an lesen
    value_deserializer=lambda b: json.loads(b.decode()),
)

Consumer-Logik:

Nachricht auf moe.ingest
    └── graph_manager.extract_and_ingest(input, answer, judge_llm)
         └── LLM extrahiert Tripel → Neo4j speichert sie

Nachricht auf moe.requests
    └── logger.debug(response_id, cache_hit, expert_models_used)
         └── (Erweiterungspunkt: Analytics, Metriken, externe Systeme)

Consumer Group: moe-worker - Offset wird in Kafka persistiert → beim Neustart werden keine Nachrichten doppelt verarbeitet - Bei Container-Neustart setzt der Consumer an der letzten bestätigten Position fort


6. Nachrichten-Schemata

moe.ingest

{
  "response_id": "chatcmpl-550e8400-e29b-41d4-a716-446655440000",
  "input":       "Was behandelt Ibuprofen?",
  "answer":      "Ibuprofen ist ein NSAID und wird zur Behandlung von Schmerzen..."
}
Feld Typ Beschreibung
response_id string Chat-ID aus der Antwort
input string Originale Benutzeranfrage (ungekürzt)
answer string Vollständige Merger-Antwort

moe.requests

{
  "response_id":        "chatcmpl-550e8400-...",
  "input":              "Was behandelt Ibuprofen?",
  "answer":             "Ibuprofen ist ein NSAID...",
  "expert_models_used": ["gemma3:27b::general", "qwen3.5:27b::general"],
  "cache_hit":          false,
  "ts":                 "2026-03-29T11:12:36.399123"
}
Feld Typ Beschreibung
response_id string Chat-ID
input string Anfrage (max. 300 Zeichen)
answer string Antwort (max. 500 Zeichen)
expert_models_used array ["modell::kategorie", ...]
cache_hit bool War ein Cache-Hit vorhanden?
ts string ISO-Timestamp

Bei Cache-Hits fehlen answer und expert_models_used (nicht relevant).

moe.feedback

{
  "response_id": "chatcmpl-550e8400-...",
  "rating":      4,
  "positive":    true,
  "ts":          "2026-03-29T11:15:00.123456"
}
Feld Typ Beschreibung
response_id string Referenz auf die bewertete Antwort
rating int 1–5 (1-2=negativ, 3=neutral, 4-5=positiv)
positive bool rating >= 4
ts string ISO-Timestamp

7. HowTo: Administratoren

Kafka-Service verwalten

# Status prüfen
sudo docker compose ps moe-kafka

# Logs anzeigen
sudo docker logs moe-kafka --tail=50

# Neustart
sudo docker compose restart moe-kafka

# Topics auflisten
sudo docker exec moe-kafka kafka-topics \
  --bootstrap-server localhost:9092 --list

# Topic-Details anzeigen
sudo docker exec moe-kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --describe --topic moe.ingest

# Alle Consumer-Groups anzeigen
sudo docker exec moe-kafka kafka-consumer-groups \
  --bootstrap-server localhost:9092 --list

# Consumer-Group Offsets und Lag prüfen
sudo docker exec moe-kafka kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe --group moe-worker

Nachrichten live mitlesen (Debugging)

# moe.ingest live verfolgen
sudo docker exec moe-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic moe.ingest \
  --from-beginning

# moe.requests live verfolgen (Audit-Log)
sudo docker exec moe-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic moe.requests \
  --from-beginning

# moe.feedback live verfolgen
sudo docker exec moe-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic moe.feedback \
  --from-beginning

# Nur letzte 10 Nachrichten eines Topics
sudo docker exec moe-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic moe.requests \
  --max-messages 10 \
  --from-beginning

Nachrichten zählen

# Anzahl Nachrichten in einem Topic (End-Offset)
sudo docker exec moe-kafka kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server localhost:9092 \
  --topic moe.ingest \
  --time -1

Topics manuell anlegen (falls auto-create deaktiviert)

sudo docker exec moe-kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --create --topic moe.ingest \
  --partitions 1 \
  --replication-factor 1 \
  --config retention.ms=604800000 \
  --config retention.bytes=536870912

# Analog für moe.requests und moe.feedback

Consumer-Group Offset zurücksetzen (Reprocessing)

# Consumer stoppen (Orchestrator herunterfahren)
sudo docker compose stop langgraph-app

# Offset auf Anfang zurücksetzen
sudo docker exec moe-kafka kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group moe-worker \
  --reset-offsets \
  --to-earliest \
  --all-topics \
  --execute

# Orchestrator wieder starten — Consumer liest alle Nachrichten erneut
sudo docker compose start langgraph-app

Achtung: Beim Reprocessing von moe.ingest werden alle Tripel erneut in Neo4j geschrieben. Da MERGE verwendet wird, entstehen keine Duplikate.

Disk-Nutzung prüfen

# Kafka-Daten
du -sh /opt/moe-infra/kafka-data/

# Log-Segmente im Container
sudo docker exec moe-kafka du -sh /var/lib/kafka/data/

Kafka vollständig zurücksetzen (Datenverlust!)

sudo docker compose stop moe-kafka
sudo rm -rf /opt/moe-infra/kafka-data/*
sudo docker compose up -d moe-kafka
# Topics werden bei nächster Nutzung neu erstellt

8. HowTo: Entwickler

Neuen Consumer für ein vorhandenes Topic schreiben

Beispiel: Einen Consumer für moe.requests der Statistiken in eine Datei schreibt.

from aiokafka import AIOKafkaConsumer
import asyncio, json

async def analytics_consumer():
    consumer = AIOKafkaConsumer(
        "moe.requests",
        bootstrap_servers="moe-kafka:9092",     # intern
        # bootstrap_servers="localhost:9092",   # von außen (Port 9092 exposed)
        group_id="moe-analytics",               # eigene Group, unabhängig von moe-worker
        auto_offset_reset="earliest",
        value_deserializer=lambda b: json.loads(b.decode()),
    )
    await consumer.start()
    try:
        async for msg in consumer:
            payload = msg.value
            print(f"[{payload['ts']}] cache_hit={payload.get('cache_hit')} "
                  f"experts={payload.get('expert_models_used')}")
    finally:
        await consumer.stop()

asyncio.run(analytics_consumer())

Wichtig: Immer eine eigene group_id verwenden — nie moe-worker teilen, da sonst der interne Consumer Nachrichten verpasst.

Neues Topic hinzufügen

  1. Konstante in main.py ergänzen:
KAFKA_TOPIC_MEIN_EVENT = "moe.mein-event"
  1. An der gewünschten Stelle publishen:
asyncio.create_task(_kafka_publish(KAFKA_TOPIC_MEIN_EVENT, {
    "feld1": wert1,
    "ts":    datetime.now().isoformat(),
}))
  1. Falls ein Consumer nötig ist, in _kafka_consumer_loop() das Topic zur Consumer-Subscription hinzufügen:
consumer = AIOKafkaConsumer(
    KAFKA_TOPIC_INGEST,
    KAFKA_TOPIC_REQUESTS,
    KAFKA_TOPIC_MEIN_EVENT,    # ← neu
    ...
)
  1. Im Consumer-Loop die neue Nachricht behandeln:
elif msg.topic == KAFKA_TOPIC_MEIN_EVENT:
    # Verarbeitung
    pass

Nachrichten von außen senden (Testing)

Der Port 9092 ist auf dem Host exposed. Von außen oder aus einem Test-Skript:

from aiokafka import AIOKafkaProducer
import asyncio, json

async def send_test():
    producer = AIOKafkaProducer(bootstrap_servers="192.168.155.224:9092")
    await producer.start()
    payload = json.dumps({"input": "Test", "answer": "Antwort", "response_id": "test-1"}).encode()
    await producer.send_and_wait("moe.ingest", payload)
    await producer.stop()

asyncio.run(send_test())

Integration testen (End-to-End)

# 1. Anfrage senden
RESPONSE=$(curl -s http://localhost:8002/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"model":"moe-orchestrator","messages":[{"role":"user","content":"Was ist Docker?"}],"stream":false}')

echo $RESPONSE | python3 -m json.tool

# 2. Response-ID extrahieren
RESPONSE_ID=$(echo $RESPONSE | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
echo "Response-ID: $RESPONSE_ID"

# 3. Kafka-Nachricht prüfen (sollte moe.ingest und moe.requests je 1 Nachricht enthalten)
sudo docker exec moe-kafka kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server localhost:9092 --topic moe.ingest --time -1

# 4. Feedback senden
curl -s http://localhost:8002/v1/feedback \
  -H "Content-Type: application/json" \
  -d "{\"response_id\":\"$RESPONSE_ID\",\"rating\":5}" | python3 -m json.tool

# 5. Feedback-Topic prüfen
sudo docker exec moe-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic moe.feedback \
  --from-beginning \
  --max-messages 1

Consumer-Lag überwachen

Der Lag gibt an, wie viele Nachrichten noch nicht verarbeitet wurden.

sudo docker exec moe-kafka kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe --group moe-worker

Beispiel-Output:

GROUP       TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
moe-worker  moe.ingest   0          42              42              0    ← kein Lag
moe-worker  moe.requests 0          87              87              0

Lag > 0 bedeutet, der Consumer kommt nicht nach — typischerweise weil Neo4j-Ingest langsam ist.


9. Monitoring & Debugging

Orchestrator-Logs prüfen

# Kafka-spezifische Log-Zeilen
sudo docker logs langgraph-orchestrator 2>&1 | grep -i kafka

# Consumer-Joins und Partitionszuweisungen
sudo docker logs langgraph-orchestrator 2>&1 | grep -E "(group|partition|Consumer)"

# Publish-Fehler
sudo docker logs langgraph-orchestrator 2>&1 | grep "Kafka publish"

Typische Log-Muster beim Start

# Normalstart (gut)
✅ Kafka Producer verbunden (moe-kafka:9092)
Joined group 'moe-worker' (generation 1) ...
✅ Kafka Consumer gestartet

# Kafka noch nicht bereit (normal, wird retryed)
⚠️ Kafka nicht erreichbar (Versuch 1/12): ... — retry in 5s
Topic moe.ingest not found in cluster metadata   ← verschwindet nach Topic-Erstellung

# Kafka dauerhaft nicht erreichbar (Problem)
❌ Kafka nach 12 Versuchen nicht erreichbar — Kafka deaktiviert

Häufige Probleme

Problem Ursache Lösung
GroupCoordinatorNotAvailableError beim Start Kafka noch hochfahrend Normal — Consumer retryed automatisch
Topic X not found in cluster metadata Topic noch nicht erstellt Normal — wird beim ersten Produce erstellt
Consumer-Lag wächst ständig Neo4j-Ingest zu langsam Neo4j-Performance prüfen, ggf. Heap erhöhen
Kafka publish [moe.ingest] fehlgeschlagen Kafka kurz nicht erreichbar Nachricht verloren — Container prüfen
Orchestrator startet aber Kafka deaktiviert 12 Verbindungsversuche fehlgeschlagen sudo docker compose restart moe-kafka

10. Fehlerbehandlung & Graceful Degradation

Das System ist so gebaut, dass es ohne Kafka vollständig funktioniert:

async def _kafka_publish(topic: str, payload: dict) -> None:
    if kafka_producer is None:
        return          # kein Kafka → keine Aktion, kein Fehler
    try:
        await kafka_producer.send_and_wait(topic, data)
    except Exception as e:
        logger.warning(f"Kafka publish [{topic}] fehlgeschlagen: {e}")
        # Kein raise → Hauptpfad (HTTP Response) nicht beeinflusst

Konsequenzen bei Kafka-Ausfall: - HTTP-Antworten werden weiterhin korrekt geliefert - GraphRAG-Ingest fällt aus (Wissensgraph wächst nicht mehr) - Request-Audit-Log fällt aus - Feedback-Events werden nicht publiziert (aber synchrone Redis/Neo4j-Updates laufen weiter)

Beim Neustart nach Ausfall: - Producer reconnected automatisch - Consumer setzt am letzten bestätigten Offset fort - Nachrichten, die während des Ausfalls produziert wurden, wurden nie gesendet (kein lokaler Buffer) → nicht nachholbar


11. Erweiterungen & Roadmap

Geplante Consumer

Consumer Topic Zweck Status
Analytics-Writer moe.requests Statistiken in InfluxDB/Grafana offen
Feedback-Aggregator moe.feedback Wöchentliche Performance-Reports offen
Re-Ranking-Trigger moe.feedback Expert-Modelle bei schlechten Scores automatisch abwählen offen
Dead-Letter-Queue alle Fehlgeschlagene Ingest-Nachrichten wiederholen offen

Skalierung (Multi-Broker)

Aktuell läuft Kafka als Single-Broker (ausreichend für dieses Setup). Für höhere Ausfallsicherheit:

# Mehrere Broker via KRaft
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"

Topics müssten mit --replication-factor 3 neu erstellt werden.

Schema Registry

Für typsichere Nachrichten (Avro/Protobuf) kann die Confluent Schema Registry ergänzt werden:

schema-registry:
  image: confluentinc/cp-schema-registry:7.7.0
  environment:
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "moe-kafka:9092"
  ports:
    - "8081:8081"

Externe Anbindung

Kafka ist über Port 9092 auf dem Host exposed. Externe Services (andere Server im LAN) können direkt Topics konsumieren:

bootstrap_servers="192.168.155.224:9092"