/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringReader
import java.io.StringWriter
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.memobase.model.SearchDoc
import org.memobase.rdf.NS
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("StreamsProcessing")
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.mediaUrlPropName))
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream(settings.inputTopic)
stream
.flatMapValues { value -> parseJson(value) }
.mapValues { value -> unpackJson(value) }
.mapValues { value -> transformJson(value) }
.map { _, value -> KeyValue(value.id, value) }
.mapValues { value ->
val writer = StringWriter()
ObjectMapper().registerKotlinModule().writeValue(writer, value)
writer.toString()
}
.to(settings.outputTopic)
return builder.build()
}
private fun parseJson(data: String): List {
val result = Klaxon().parseJsonObject(StringReader(data))
return listOf(result)
}
private fun unpackJson(input: JsonObject): Map {
val graph = input["@graph"] as JsonArray
return graph.map {
if (it["@type"] == NS.rico + "Record") {
Pair("record", it)
} else {
Pair(it["@id"] as String, it)
}
}.toMap()
}
private fun transformJson(input: Map): SearchDoc {
return searchDocTransform.transform(input)
}
}