/* * Table Data Import Service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import ch.memobase.reporting.Report import ch.memobase.reporting.ReportStatus import ch.memobase.settings.SettingsLoader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule import java.io.StringWriter import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.Predicate import org.apache.logging.log4j.LogManager import org.memobase.helpers.Default import org.memobase.helpers.JSON import org.memobase.helpers.KEYS import org.memobase.model.DocumentsSearchDoc import org.memobase.model.Schema class KafkaTopology(private val settings: SettingsLoader) { private val log = LogManager.getLogger("SearchDocService") private val appSettings = settings.appSettings private val reportTopic = settings.processReportTopic private val searchDocTransform = DocumentsSearchDocBuilder(appSettings.getProperty(KEYS.SettingsProps.mediaUrl)) private val institutionSearchDoc = InstitutionSearchDocBuilder(appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath), appSettings) private val jsonWriter = ObjectMapper().registerKotlinModule().writer() fun build(): Topology { val builder = StreamsBuilder() val stream = builder.stream(settings.inputTopic) val branchedStream = stream .mapValues { value -> JSON.parse(value) } .filter { _, value -> value.isNotEmpty() } .mapValues { value -> JSON.unpack(value) } .branch( Predicate { _, value -> value.containsKey(JSON.record) }, Predicate { _, value -> value.containsKey(JSON.institution) }, Predicate { _, value -> value.containsKey(JSON.recordSet) }, Predicate { _, _ -> true } ) val recordStream = branchedStream[0] .mapValues { readOnlyKey, value -> try { Pair( searchDocTransform.transform(value), Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name) ) } catch (ex: InvalidInputException) { Pair(DocumentsSearchDoc.DEFAULT, Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name)) } } outputStreams(recordStream) val institutionStream = branchedStream[1] .mapValues { readOnlyKey, value -> try { Pair( institutionSearchDoc.transform(readOnlyKey, value), Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name) ) } catch (ex: InvalidInputException) { Pair(Default.institutionSearchDoc, Report( readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name)) } } outputStreams(institutionStream) branchedStream[3] .mapValues { readOnlyKey, value -> Report( readOnlyKey, ReportStatus.fatal, "No record, memobase institution or record set present in input data: $value.", Service.name ) } .to(reportTopic) return builder.build() } private fun outputStreams(stream: KStream>) { stream .mapValues { value -> value.second.toJson() } .to(reportTopic) stream .filterNot { _, value -> value.second.status == "FAILURE" } .mapValues { value -> value.first } .mapValues { value -> val out = StringWriter() jsonWriter.writeValue(out, value) out.toString() } .to(settings.outputTopic) } }