/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import com.beust.klaxon.JsonObject
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringWriter
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.JSON
import org.memobase.model.Report
import org.memobase.model.SearchDoc
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("SearchDocService")
private val reportTopic = settings.processReportTopic
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.mediaUrlPropName))
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream(settings.inputTopic)
val transformedStream = stream
.flatMapValues { value -> JSON.parse(value) }
.mapValues { value -> JSON.unpack(value) }
.mapValues { readOnlyKey, value ->
try {
Pair(transformJson(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
} catch (ex: InvalidInputException) {
Pair(null, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
}
transformedStream
.map { _, value -> KeyValue(value.second.id, value.second.toJson()) }
.to(reportTopic)
transformedStream
.filterNot { _, value -> value.second.status == "FAILURE" }
.map { _, value -> KeyValue(value.first?.id, value.first) }
.mapValues { value ->
val writer = StringWriter()
ObjectMapper().registerKotlinModule().writeValue(writer, value)
writer.toString()
}
.to(settings.outputTopic)
return builder.build()
}
private fun transformJson(input: Map): SearchDoc {
return searchDocTransform.transform(input)
}
}