Generische Kafka-Applikation für Metafacture-Transformationen

Ausgangslage

Für die Erstellung der verschiedenen Entitäten-Kategorien in linked-swissbib verwenden wir bisher ein einzelnes Fluxskript, d.h. die "Baseline" wird von einer einzelnen Applikation abgedeckt. Um den Prozess besser parallelisieren zu können, macht es Sinn, diese Transformationen in Kafka in einzelne Microservices zu packen (im Moment wären das also fünf distinkte Microservices, für bibliographicResource, person, organisation, item und document). Diese Microservices sind in den Grundzügen identisch: Sie empfangen Nachrichten aus einem Topic, leiten jede Nachricht einzeln durch den Metafacture-Workflow und schicken sie prozessiert in ein neues Kafka-Topic. Anders ausgedrückt: Sie unterscheiden sich nur in ihrem jeweiligen Metafacture-Workflow, die KafkaStreams-spezfischen Teile bleiben aber gleich.

Natürlich ist es möglich, für jeden einzelnen Use case ein eigener Microservice zu erstellen, in dem der Metafacture-Workflow hart codiert its. Doch wären diese Microservices in grossen Teilen redundant, und der Unterhalt wäre dementsprechend aufwendig.

Es liegt daher nahe, eine generische Applikation zu entwickeln, welche beliebige Flux-Skripte verarbeiten kann.

Ansätze

Schnelle Lösung ohne Kafka Streams

Im Rahmen von linked-swissbib habe ich bereits zwei Metafacture-commands entwickelt, welche als Kafka-Consumer bzw. Kafka-Producer fungieren. Wir haben sie nie produktiv eingesetzt, und die verwendete Kafka-API ist veraltet, doch wäre der Aufwand vergleichsweise gering, sie wieder flott zu kriegen. Anschliessend liesse sich ein Transformations-Workflow einfach mit metafacture-runner-Prozess starten. Ein Docker-Image für metafacture-runner steht ebenfalls bereits zur Verfügung.

Nachteile:

  • Schlechtere Integration ins Kafka-Ökosystem als eine KafkaStreams-Applikation
  • Zumindest für die Generierung der work-Ressourcen müssten wir auf eine andere Lösung zurückgreifen

Komplexe Lösung mit Kafka Streams

metafacture-runner parst Flux-Dateien und erstellt daraus dynamisch einen Metafacture-Workflow. Es ist denkbar, diesen Mechanismus in einer Kafka Streams-Applikation nachzubauen. Das Szenario wäre folgendes:

  1. AnwenderIn bindet Flux-Datei, Metamorph-Definitionen und allfällige weitere von Metamorph verwendete Dateien (Konkordanzen etc.) in einen Docker-Container ein.
  2. Kafka Streams-Applikation liest Flux-Datei aus und parst diese mit den Mechanismen, wie sie im Modul metafacture-flux definiert werden.
  3. Die Applikation bindet die Funktionalität der geparsten Flux-Datei in den Streams-Workflow ein (vgl. bisheriger Stand)
  4. Applikation beginnt mit der Prozessierung des Streams

Der Parsing-Prozess von metafacture-runner läuft folgendermassen ab (in groben Zügen):

  • Die Klasse org.metafacture.runner.Flux ruft die statische Methode compile von org.metafacture.flux.FluxCompiler auf und übergibt ihr den Inhalt der Flux-Datei als Stream
  • compile erstellt mithilfe einer ANTLR-Grammatik (Flux.g) ein FluxParser-Objekt (ein AST)
  • Anschliessend wird das FluxParser-Objekt basierend auf einer weiteren Grammatik (FlowBuilder.g) der AST zu einem FlowBuilder-Objekt zusammengestellt.
  • FlowBuilder gibt durch die Methode flux() ein FluxProgram zurück
  • Mit dessen Methode start() wird der Workflow gestartet

Eine direkte Verwendung der Mechanismen in metafacture-flux ist allerdings aus verschiedenen Gründen nicht möglich:

  • Viele der Methoden zur Erstellung bzw. Manipulation des AST von Flux sind protected oder private, die Klassen selber final.
  • Es gibt im beschriebenen Prozess direkt keine Möglichkeit, eine Source bzw. einen Sink zu definieren, der nicht bereits in der Flux-Datei definiert ist. Ein Eintritts- und Austrittspunkt für die Daten (Nachrichten) ist aber natürlich notwendig. Bei der Verwendung der Metafacture-API wird dies mit der Methode process() bzw. read() auf dem ersten ObjectReceiver ermöglicht (vgl. Anleitung).

Folglich müssten die Funktionalitäten in metafacture-flux nachgebaut werden.