/* * Table Data Import Service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import com.beust.klaxon.JsonObject import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import java.io.StringWriter import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.logging.log4j.LogManager import org.memobase.helpers.JSON import org.memobase.model.Report import org.memobase.model.SearchDoc import org.memobase.settings.SettingsLoader class KafkaTopology(private val settings: SettingsLoader) { private val log = LogManager.getLogger("SearchDocService") private val reportTopic = settings.processReportTopic private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.mediaUrlPropName)) fun build(): Topology { val builder = StreamsBuilder() val stream = builder.stream(settings.inputTopic) val transformedStream = stream .flatMapValues { value -> JSON.parse(value) } .mapValues { value -> JSON.unpack(value) } .mapValues { readOnlyKey, value -> try { Pair(transformJson(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc.")) } catch (ex: InvalidInputException) { Pair(null, Report(readOnlyKey, "FAILURE", ex.localizedMessage)) } } transformedStream .map { _, value -> KeyValue(value.second.id, value.second.toJson()) } .to(reportTopic) transformedStream .filterNot { _, value -> value.second.status == "FAILURE" } .map { _, value -> KeyValue(value.first?.id, value.first) } .mapValues { value -> val writer = StringWriter() ObjectMapper().registerKotlinModule().writeValue(writer, value) writer.toString() } .to(settings.outputTopic) return builder.build() } private fun transformJson(input: Map): SearchDoc { return searchDocTransform.transform(input) } }