/* * Table Data Import Service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import com.beust.klaxon.JsonArray import com.beust.klaxon.JsonObject import com.beust.klaxon.Klaxon import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import java.io.StringReader import java.io.StringWriter import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.logging.log4j.LogManager import org.memobase.model.SearchDoc import org.memobase.rdf.NS import org.memobase.settings.SettingsLoader class KafkaTopology(private val settings: SettingsLoader) { private val log = LogManager.getLogger("StreamsProcessing") private val searchDocTransform = SearchDocTransform() fun build(): Topology { val builder = StreamsBuilder() val stream = builder.stream(settings.inputTopic) stream .flatMapValues { value -> parseJson(value) } .mapValues { value -> unpackJson(value) } .mapValues { value -> transformJson(value) } .map { _, value -> KeyValue(value.id, value) } .mapValues { value -> val writer = StringWriter() ObjectMapper().registerKotlinModule().writeValue(writer, value) writer.toString() } .to(settings.outputTopic) return builder.build() } private fun parseJson(data: String): List { val result = Klaxon().parseJsonObject(StringReader(data)) return listOf(result) } private fun unpackJson(input: JsonObject): Map { val graph = input["@graph"] as JsonArray return graph.map { if (it["@type"] == NS.rico + "Record") { Pair("record", it) } else { Pair(it["@id"] as String, it) } }.toMap() } private fun transformJson(input: Map): SearchDoc { return searchDocTransform.transform(input) } }