Generische Kafka-Applikation für Metafacture-Transformationen
Ausgangslage
Für die Erstellung der verschiedenen Entitäten-Kategorien in linked-swissbib verwenden wir bisher ein einzelnes Fluxskript, d.h. die "Baseline" wird von einer einzelnen Applikation abgedeckt. Um den Prozess besser parallelisieren zu können, macht es Sinn, diese Transformationen in Kafka in einzelne Microservices zu packen (im Moment wären das also fünf distinkte Microservices, für bibliographicResource
, person
, organisation
, item
und document
). Diese Microservices sind in den Grundzügen identisch: Sie empfangen Nachrichten aus einem Topic, leiten jede Nachricht einzeln durch den Metafacture-Workflow und schicken sie prozessiert in ein neues Kafka-Topic. Anders ausgedrückt: Sie unterscheiden sich nur in ihrem jeweiligen Metafacture-Workflow, die KafkaStreams-spezfischen Teile bleiben aber gleich.
Natürlich ist es möglich, für jeden einzelnen Use case ein eigener Microservice zu erstellen, in dem der Metafacture-Workflow hart codiert its. Doch wären diese Microservices in grossen Teilen redundant, und der Unterhalt wäre dementsprechend aufwendig.
Es liegt daher nahe, eine generische Applikation zu entwickeln, welche beliebige Flux-Skripte verarbeiten kann.
Ansätze
Schnelle Lösung ohne Kafka Streams
Im Rahmen von linked-swissbib habe ich bereits zwei Metafacture-commands entwickelt, welche als Kafka-Consumer bzw. Kafka-Producer fungieren. Wir haben sie nie produktiv eingesetzt, und die verwendete Kafka-API ist veraltet, doch wäre der Aufwand vergleichsweise gering, sie wieder flott zu kriegen. Anschliessend liesse sich ein Transformations-Workflow einfach mit metafacture-runner-Prozess starten. Ein Docker-Image für metafacture-runner steht ebenfalls bereits zur Verfügung.
Nachteile:
- Schlechtere Integration ins Kafka-Ökosystem als eine KafkaStreams-Applikation
- Zumindest für die Generierung der
work
-Ressourcen müssten wir auf eine andere Lösung zurückgreifen
Komplexe Lösung mit Kafka Streams
metafacture-runner parst Flux-Dateien und erstellt daraus dynamisch einen Metafacture-Workflow. Es ist denkbar, diesen Mechanismus in einer Kafka Streams-Applikation nachzubauen. Das Szenario wäre folgendes:
- AnwenderIn bindet Flux-Datei, Metamorph-Definitionen und allfällige weitere von Metamorph verwendete Dateien (Konkordanzen etc.) in einen Docker-Container ein.
- Kafka Streams-Applikation liest Flux-Datei aus und parst diese mit den Mechanismen, wie sie im Modul
metafacture-flux
definiert werden. - Die Applikation bindet die Funktionalität der geparsten Flux-Datei in den Streams-Workflow ein (vgl. bisheriger Stand)
- Applikation beginnt mit der Prozessierung des Streams
Der Parsing-Prozess von metafacture-runner
läuft folgendermassen ab (in groben Zügen):
- Die Klasse
org.metafacture.runner.Flux
ruft die statische Methodecompile
vonorg.metafacture.flux.FluxCompiler
auf und übergibt ihr den Inhalt der Flux-Datei als Stream -
compile
erstellt mithilfe einer ANTLR-Grammatik (Flux.g
) einFluxParser
-Objekt (ein AST) - Anschliessend wird das
FluxParser
-Objekt basierend auf einer weiteren Grammatik (FlowBuilder.g
) der AST zu einemFlowBuilder
-Objekt zusammengestellt. -
FlowBuilder
gibt durch die Methodeflux()
einFluxProgram
zurück - Mit dessen Methode
start()
wird der Workflow gestartet
Eine direkte Verwendung der Mechanismen in metafacture-flux
ist allerdings aus verschiedenen Gründen nicht möglich:
- Viele der Methoden zur Erstellung bzw. Manipulation des AST von Flux sind
protected
oderprivate
, die Klassen selberfinal
. - Es gibt im beschriebenen Prozess direkt keine Möglichkeit, eine Source bzw. einen Sink zu definieren, der nicht bereits in der Flux-Datei definiert ist. Ein Eintritts- und Austrittspunkt für die Daten (Nachrichten) ist aber natürlich notwendig. Bei der Verwendung der Metafacture-API wird dies mit der Methode
process()
bzw.read()
auf dem erstenObjectReceiver
ermöglicht (vgl. Anleitung).
Folglich müssten die Funktionalitäten in metafacture-flux
nachgebaut werden.