/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringWriter
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Default
import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS
import org.memobase.model.DocumentsSearchDoc
import org.memobase.model.Schema
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("SearchDocService")
private val appSettings = settings.appSettings
private val reportTopic = settings.processReportTopic
private val searchDocTransform = DocumentsSearchDocBuilder(appSettings.getProperty(KEYS.SettingsProps.mediaUrl))
private val institutionSearchDoc =
InstitutionSearchDocBuilder(appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath), appSettings)
private val jsonWriter = ObjectMapper().registerKotlinModule().writer()
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream(settings.inputTopic)
val branchedStream = stream
.mapValues { value -> JSON.parse(value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.record) },
Predicate { _, value -> value.containsKey(JSON.institution) },
Predicate { _, value -> value.containsKey(JSON.recordSet) },
Predicate { _, _ -> true }
)
val recordStream = branchedStream[0]
.mapValues { readOnlyKey, value ->
try {
Pair(
searchDocTransform.transform(value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(DocumentsSearchDoc.DEFAULT, Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name))
}
}
outputStreams(recordStream)
val institutionStream = branchedStream[1]
.mapValues { readOnlyKey, value ->
try {
Pair(
institutionSearchDoc.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(Default.institutionSearchDoc, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name))
}
}
outputStreams(institutionStream)
branchedStream[3]
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
"No record, memobase institution or record set present in input data: $value.",
Service.name
)
}
.to(reportTopic)
return builder.build()
}
private fun outputStreams(stream: KStream>) {
stream
.mapValues { value -> value.second.toJson() }
.to(reportTopic)
stream
.filterNot { _, value -> value.second.status == "FAILURE" }
.mapValues { value -> value.first }
.mapValues { value ->
val out = StringWriter()
jsonWriter.writeValue(out, value)
out.toString()
}
.to(settings.outputTopic)
}
}