Kafka — Architektur, Topics & HowTo¶
Sovereign MoE — Kafka-Dokumentation
Stand: 2026-03-29
1. Überblick & Architekturentscheidung¶
Warum Kafka?¶
Vor Kafka erledigten alle Hintergrundoperationen (GraphRAG-Ingest, Audit-Logging) als asyncio.create_task() im Orchestrator-Prozess:
Probleme dieses Ansatzes: - Bei Neustart des Containers gehen laufende Ingest-Tasks verloren - Kein Retry bei Fehlern (Neo4j kurz nicht erreichbar → Daten verloren) - Keine Entkopplung: Fehler im Ingest können die Haupt-Eventloop beeinflussen - Keine Möglichkeit, externe Consumer (Analytics, Monitoring) anzubinden
Mit Kafka:
HTTP Response
└── kafka_producer.send("moe.ingest", payload) # persistent, entkoppelt
└── [Kafka] ──► Consumer: extract_and_ingest(...)
- Persistenz: Nachrichten überleben Container-Neustarts (7 Tage Retention)
- Entkopplung: Ingest-Fehler beeinflussen nicht den HTTP-Response-Pfad
- Erweiterbarkeit: Externe Consumer können jederzeit Topics abonnieren
- Observierbarkeit: Alle Events sind in Topics nachverfolgbar
Architektur im System¶
graph LR
subgraph ORCH["langgraph-orchestrator"]
MERGER_NODE["merger_node"]
FB_EP["/v1/feedback"]
CONSUMER["_kafka_consumer_loop"]
end
subgraph KAFKA_BROKER["moe-kafka · Port 9092\nKRaft Mode · Retention 7d / 512 MB"]
INGEST["moe.ingest"]
REQUESTS["moe.requests"]
FEEDBACK["moe.feedback"]
end
NEO4J[("Neo4j\nKnowledge Graph")]
MERGER_NODE -->|"_kafka_publish"| INGEST
MERGER_NODE -->|"_kafka_publish"| REQUESTS
FB_EP -->|"_kafka_publish"| FEEDBACK
INGEST --> CONSUMER
REQUESTS --> CONSUMER
CONSUMER -->|"GraphRAG Ingest"| NEO4J
2. Setup & Konfiguration¶
Docker Service¶
# docker-compose.yml
moe-kafka:
image: confluentinc/cp-kafka:7.7.0
container_name: moe-kafka
restart: always
ports:
- "9092:9092"
environment:
CLUSTER_ID: "moe-kafka-cluster-01"
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@moe-kafka:9093"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://moe-kafka:9092"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_RETENTION_BYTES: 536870912
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
volumes:
- /opt/moe-infra/kafka-data:/var/lib/kafka/data
Umgebungsvariable¶
Der Orchestrator parst daraus den Bootstrap-Server: moe-kafka:9092
Python-Abhängigkeit¶
3. Topics¶
| Topic | Erzeuger | Consumer | Zweck |
|---|---|---|---|
moe.ingest |
merger_node |
_kafka_consumer_loop |
GraphRAG Tripel-Extraktion aus Antworten |
moe.requests |
merger_node |
_kafka_consumer_loop (Logging) |
Vollständiger Audit-Log aller Anfragen |
moe.feedback |
POST /v1/feedback |
— (noch kein aktiver Consumer) | Feedback-Events für externe Auswertung |
Topic-Parameter (auto-created)¶
Alle Topics werden automatisch erstellt (KAFKA_AUTO_CREATE_TOPICS_ENABLE=true) mit:
- Partitionen: 1 (Single-Broker, kein Clustering nötig)
- Replikation: 1
- Retention: 7 Tage oder 512 MB (je nachdem was zuerst erreicht wird)
4. Producer¶
Datei: main.py — _kafka_publish()
async def _kafka_publish(topic: str, payload: dict) -> None:
if kafka_producer is None:
return # Kafka nicht verfügbar → stille Rückgabe
data = json.dumps(payload).encode()
await kafka_producer.send_and_wait(topic, data)
Der Producer wird beim Start in _init_kafka() initialisiert (12 Versuche, Backoff 5–60s):
producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP,
value_serializer=lambda v: v, # bytes, bereits von _kafka_publish kodiert
)
Wann wird publiziert:
| Zeitpunkt | Topic | Auslöser |
|---|---|---|
| Nach jeder Merger-Antwort (> 150 Zeichen) | moe.ingest |
merger_node |
| Nach jeder Merger-Antwort | moe.requests |
merger_node |
| Bei Cache-Hit | moe.requests |
merger_node |
| Bei POST /v1/feedback | moe.feedback |
Feedback-Endpoint |
5. Consumer¶
Datei: main.py — _kafka_consumer_loop()
Der Consumer läuft als permanenter asyncio-Background-Task, gestartet in lifespan.
consumer = AIOKafkaConsumer(
"moe.ingest",
"moe.requests",
bootstrap_servers=KAFKA_BOOTSTRAP,
group_id="moe-worker",
auto_offset_reset="earliest", # beim ersten Start von Anfang an lesen
value_deserializer=lambda b: json.loads(b.decode()),
)
Consumer-Logik:
Nachricht auf moe.ingest
└── graph_manager.extract_and_ingest(input, answer, judge_llm)
└── LLM extrahiert Tripel → Neo4j speichert sie
Nachricht auf moe.requests
└── logger.debug(response_id, cache_hit, expert_models_used)
└── (Erweiterungspunkt: Analytics, Metriken, externe Systeme)
Consumer Group: moe-worker
- Offset wird in Kafka persistiert → beim Neustart werden keine Nachrichten doppelt verarbeitet
- Bei Container-Neustart setzt der Consumer an der letzten bestätigten Position fort
6. Nachrichten-Schemata¶
moe.ingest¶
{
"response_id": "chatcmpl-550e8400-e29b-41d4-a716-446655440000",
"input": "Was behandelt Ibuprofen?",
"answer": "Ibuprofen ist ein NSAID und wird zur Behandlung von Schmerzen..."
}
| Feld | Typ | Beschreibung |
|---|---|---|
response_id |
string | Chat-ID aus der Antwort |
input |
string | Originale Benutzeranfrage (ungekürzt) |
answer |
string | Vollständige Merger-Antwort |
moe.requests¶
{
"response_id": "chatcmpl-550e8400-...",
"input": "Was behandelt Ibuprofen?",
"answer": "Ibuprofen ist ein NSAID...",
"expert_models_used": ["gemma3:27b::general", "qwen3.5:27b::general"],
"cache_hit": false,
"ts": "2026-03-29T11:12:36.399123"
}
| Feld | Typ | Beschreibung |
|---|---|---|
response_id |
string | Chat-ID |
input |
string | Anfrage (max. 300 Zeichen) |
answer |
string | Antwort (max. 500 Zeichen) |
expert_models_used |
array | ["modell::kategorie", ...] |
cache_hit |
bool | War ein Cache-Hit vorhanden? |
ts |
string | ISO-Timestamp |
Bei Cache-Hits fehlen answer und expert_models_used (nicht relevant).
moe.feedback¶
{
"response_id": "chatcmpl-550e8400-...",
"rating": 4,
"positive": true,
"ts": "2026-03-29T11:15:00.123456"
}
| Feld | Typ | Beschreibung |
|---|---|---|
response_id |
string | Referenz auf die bewertete Antwort |
rating |
int | 1–5 (1-2=negativ, 3=neutral, 4-5=positiv) |
positive |
bool | rating >= 4 |
ts |
string | ISO-Timestamp |
7. HowTo: Administratoren¶
Kafka-Service verwalten¶
# Status prüfen
sudo docker compose ps moe-kafka
# Logs anzeigen
sudo docker logs moe-kafka --tail=50
# Neustart
sudo docker compose restart moe-kafka
# Topics auflisten
sudo docker exec moe-kafka kafka-topics \
--bootstrap-server localhost:9092 --list
# Topic-Details anzeigen
sudo docker exec moe-kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe --topic moe.ingest
# Alle Consumer-Groups anzeigen
sudo docker exec moe-kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 --list
# Consumer-Group Offsets und Lag prüfen
sudo docker exec moe-kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe --group moe-worker
Nachrichten live mitlesen (Debugging)¶
# moe.ingest live verfolgen
sudo docker exec moe-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic moe.ingest \
--from-beginning
# moe.requests live verfolgen (Audit-Log)
sudo docker exec moe-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic moe.requests \
--from-beginning
# moe.feedback live verfolgen
sudo docker exec moe-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic moe.feedback \
--from-beginning
# Nur letzte 10 Nachrichten eines Topics
sudo docker exec moe-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic moe.requests \
--max-messages 10 \
--from-beginning
Nachrichten zählen¶
# Anzahl Nachrichten in einem Topic (End-Offset)
sudo docker exec moe-kafka kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server localhost:9092 \
--topic moe.ingest \
--time -1
Topics manuell anlegen (falls auto-create deaktiviert)¶
sudo docker exec moe-kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic moe.ingest \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=604800000 \
--config retention.bytes=536870912
# Analog für moe.requests und moe.feedback
Consumer-Group Offset zurücksetzen (Reprocessing)¶
# Consumer stoppen (Orchestrator herunterfahren)
sudo docker compose stop langgraph-app
# Offset auf Anfang zurücksetzen
sudo docker exec moe-kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group moe-worker \
--reset-offsets \
--to-earliest \
--all-topics \
--execute
# Orchestrator wieder starten — Consumer liest alle Nachrichten erneut
sudo docker compose start langgraph-app
Achtung: Beim Reprocessing von moe.ingest werden alle Tripel erneut in Neo4j geschrieben. Da MERGE verwendet wird, entstehen keine Duplikate.
Disk-Nutzung prüfen¶
# Kafka-Daten
du -sh /opt/moe-infra/kafka-data/
# Log-Segmente im Container
sudo docker exec moe-kafka du -sh /var/lib/kafka/data/
Kafka vollständig zurücksetzen (Datenverlust!)¶
sudo docker compose stop moe-kafka
sudo rm -rf /opt/moe-infra/kafka-data/*
sudo docker compose up -d moe-kafka
# Topics werden bei nächster Nutzung neu erstellt
8. HowTo: Entwickler¶
Neuen Consumer für ein vorhandenes Topic schreiben¶
Beispiel: Einen Consumer für moe.requests der Statistiken in eine Datei schreibt.
from aiokafka import AIOKafkaConsumer
import asyncio, json
async def analytics_consumer():
consumer = AIOKafkaConsumer(
"moe.requests",
bootstrap_servers="moe-kafka:9092", # intern
# bootstrap_servers="localhost:9092", # von außen (Port 9092 exposed)
group_id="moe-analytics", # eigene Group, unabhängig von moe-worker
auto_offset_reset="earliest",
value_deserializer=lambda b: json.loads(b.decode()),
)
await consumer.start()
try:
async for msg in consumer:
payload = msg.value
print(f"[{payload['ts']}] cache_hit={payload.get('cache_hit')} "
f"experts={payload.get('expert_models_used')}")
finally:
await consumer.stop()
asyncio.run(analytics_consumer())
Wichtig: Immer eine eigene group_id verwenden — nie moe-worker teilen, da sonst der interne Consumer Nachrichten verpasst.
Neues Topic hinzufügen¶
- Konstante in
main.pyergänzen:
- An der gewünschten Stelle publishen:
asyncio.create_task(_kafka_publish(KAFKA_TOPIC_MEIN_EVENT, {
"feld1": wert1,
"ts": datetime.now().isoformat(),
}))
- Falls ein Consumer nötig ist, in
_kafka_consumer_loop()das Topic zur Consumer-Subscription hinzufügen:
consumer = AIOKafkaConsumer(
KAFKA_TOPIC_INGEST,
KAFKA_TOPIC_REQUESTS,
KAFKA_TOPIC_MEIN_EVENT, # ← neu
...
)
- Im Consumer-Loop die neue Nachricht behandeln:
Nachrichten von außen senden (Testing)¶
Der Port 9092 ist auf dem Host exposed. Von außen oder aus einem Test-Skript:
from aiokafka import AIOKafkaProducer
import asyncio, json
async def send_test():
producer = AIOKafkaProducer(bootstrap_servers="192.168.155.224:9092")
await producer.start()
payload = json.dumps({"input": "Test", "answer": "Antwort", "response_id": "test-1"}).encode()
await producer.send_and_wait("moe.ingest", payload)
await producer.stop()
asyncio.run(send_test())
Integration testen (End-to-End)¶
# 1. Anfrage senden
RESPONSE=$(curl -s http://localhost:8002/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"moe-orchestrator","messages":[{"role":"user","content":"Was ist Docker?"}],"stream":false}')
echo $RESPONSE | python3 -m json.tool
# 2. Response-ID extrahieren
RESPONSE_ID=$(echo $RESPONSE | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
echo "Response-ID: $RESPONSE_ID"
# 3. Kafka-Nachricht prüfen (sollte moe.ingest und moe.requests je 1 Nachricht enthalten)
sudo docker exec moe-kafka kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server localhost:9092 --topic moe.ingest --time -1
# 4. Feedback senden
curl -s http://localhost:8002/v1/feedback \
-H "Content-Type: application/json" \
-d "{\"response_id\":\"$RESPONSE_ID\",\"rating\":5}" | python3 -m json.tool
# 5. Feedback-Topic prüfen
sudo docker exec moe-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic moe.feedback \
--from-beginning \
--max-messages 1
Consumer-Lag überwachen¶
Der Lag gibt an, wie viele Nachrichten noch nicht verarbeitet wurden.
sudo docker exec moe-kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe --group moe-worker
Beispiel-Output:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
moe-worker moe.ingest 0 42 42 0 ← kein Lag
moe-worker moe.requests 0 87 87 0
Lag > 0 bedeutet, der Consumer kommt nicht nach — typischerweise weil Neo4j-Ingest langsam ist.
9. Monitoring & Debugging¶
Orchestrator-Logs prüfen¶
# Kafka-spezifische Log-Zeilen
sudo docker logs langgraph-orchestrator 2>&1 | grep -i kafka
# Consumer-Joins und Partitionszuweisungen
sudo docker logs langgraph-orchestrator 2>&1 | grep -E "(group|partition|Consumer)"
# Publish-Fehler
sudo docker logs langgraph-orchestrator 2>&1 | grep "Kafka publish"
Typische Log-Muster beim Start¶
# Normalstart (gut)
✅ Kafka Producer verbunden (moe-kafka:9092)
Joined group 'moe-worker' (generation 1) ...
✅ Kafka Consumer gestartet
# Kafka noch nicht bereit (normal, wird retryed)
⚠️ Kafka nicht erreichbar (Versuch 1/12): ... — retry in 5s
Topic moe.ingest not found in cluster metadata ← verschwindet nach Topic-Erstellung
# Kafka dauerhaft nicht erreichbar (Problem)
❌ Kafka nach 12 Versuchen nicht erreichbar — Kafka deaktiviert
Häufige Probleme¶
| Problem | Ursache | Lösung |
|---|---|---|
GroupCoordinatorNotAvailableError beim Start |
Kafka noch hochfahrend | Normal — Consumer retryed automatisch |
Topic X not found in cluster metadata |
Topic noch nicht erstellt | Normal — wird beim ersten Produce erstellt |
| Consumer-Lag wächst ständig | Neo4j-Ingest zu langsam | Neo4j-Performance prüfen, ggf. Heap erhöhen |
Kafka publish [moe.ingest] fehlgeschlagen |
Kafka kurz nicht erreichbar | Nachricht verloren — Container prüfen |
| Orchestrator startet aber Kafka deaktiviert | 12 Verbindungsversuche fehlgeschlagen | sudo docker compose restart moe-kafka |
10. Fehlerbehandlung & Graceful Degradation¶
Das System ist so gebaut, dass es ohne Kafka vollständig funktioniert:
async def _kafka_publish(topic: str, payload: dict) -> None:
if kafka_producer is None:
return # kein Kafka → keine Aktion, kein Fehler
try:
await kafka_producer.send_and_wait(topic, data)
except Exception as e:
logger.warning(f"Kafka publish [{topic}] fehlgeschlagen: {e}")
# Kein raise → Hauptpfad (HTTP Response) nicht beeinflusst
Konsequenzen bei Kafka-Ausfall: - HTTP-Antworten werden weiterhin korrekt geliefert - GraphRAG-Ingest fällt aus (Wissensgraph wächst nicht mehr) - Request-Audit-Log fällt aus - Feedback-Events werden nicht publiziert (aber synchrone Redis/Neo4j-Updates laufen weiter)
Beim Neustart nach Ausfall: - Producer reconnected automatisch - Consumer setzt am letzten bestätigten Offset fort - Nachrichten, die während des Ausfalls produziert wurden, wurden nie gesendet (kein lokaler Buffer) → nicht nachholbar
11. Erweiterungen & Roadmap¶
Geplante Consumer¶
| Consumer | Topic | Zweck | Status |
|---|---|---|---|
| Analytics-Writer | moe.requests |
Statistiken in InfluxDB/Grafana | offen |
| Feedback-Aggregator | moe.feedback |
Wöchentliche Performance-Reports | offen |
| Re-Ranking-Trigger | moe.feedback |
Expert-Modelle bei schlechten Scores automatisch abwählen | offen |
| Dead-Letter-Queue | alle | Fehlgeschlagene Ingest-Nachrichten wiederholen | offen |
Skalierung (Multi-Broker)¶
Aktuell läuft Kafka als Single-Broker (ausreichend für dieses Setup). Für höhere Ausfallsicherheit:
# Mehrere Broker via KRaft
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
Topics müssten mit --replication-factor 3 neu erstellt werden.
Schema Registry¶
Für typsichere Nachrichten (Avro/Protobuf) kann die Confluent Schema Registry ergänzt werden:
schema-registry:
image: confluentinc/cp-schema-registry:7.7.0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "moe-kafka:9092"
ports:
- "8081:8081"
Externe Anbindung¶
Kafka ist über Port 9092 auf dem Host exposed. Externe Services (andere Server im LAN) können direkt Topics konsumieren: