/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringWriter
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.DocumentTypeMapper
import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.InstitutionTypeMapper
import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS.SettingsProps
import org.memobase.model.DocumentsSearchDoc
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
import org.memobase.model.Schema
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("SearchDocService")
private val appSettings = settings.appSettings
private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
private val documentMapperPath = appSettings.getProperty(SettingsProps.documentTypeLabelsPath)
private val institutionMapperPath = appSettings.getProperty(SettingsProps.institutionTypeLabelsPath)
private val reportTopic = settings.processReportTopic
private val documentTypeMapper = DocumentTypeMapper(documentMapperPath)
private val institutionTypeMapper = InstitutionTypeMapper(institutionMapperPath)
private val documentSearchDocBuilder = DocumentsSearchDocBuilder(documentTypeMapper, mediaUrl)
private val institutionSearchDoc = InstitutionSearchDocBuilder(institutionTypeMapper, appSettings)
private val elasticSearchWrapper = ElasticSearchWrapper(settings.appSettings)
private val recordSetSearchDocBuilder =
RecordSetSearchDocBuilder(elasticSearchWrapper)
private val jsonWriter = ObjectMapper().registerKotlinModule().writer()
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream(settings.inputTopic)
val branchedStream = stream
.mapValues { value -> JSON.parse(value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.recordTag) },
Predicate { _, value -> value.containsKey(JSON.institutionTag) },
Predicate { _, value -> value.containsKey(JSON.recordSetTag) },
Predicate { _, _ -> true }
)
val recordStream = branchedStream[0]
.mapValues { readOnlyKey, value ->
try {
Pair(
documentSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(DocumentsSearchDoc.DEFAULT, Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name))
}
}
outputStreams(recordStream)
val institutionStream = branchedStream[1]
.mapValues { readOnlyKey, value ->
try {
Pair(
institutionSearchDoc.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(InstitutionSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name))
}
}
outputStreams(institutionStream)
val recordSetStream = branchedStream[2]
.mapValues { readOnlyKey, value ->
try {
Pair(
recordSetSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(RecordSetSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name))
}
}
outputStreams(recordSetStream)
branchedStream[3]
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
"No record, memobase institution or record set present in input data: $value.",
Service.name
)
}
.to(reportTopic)
return builder.build()
}
private fun outputStreams(stream: KStream>) {
stream
.mapValues { value -> value.second.toJson() }
.to(reportTopic)
stream
.filterNot { _, value -> value.second.status == ReportStatus.fatal }
.mapValues { value -> value.first }
.mapValues { value ->
val out = StringWriter()
jsonWriter.writeValue(out, value)
out.toString()
}
.to(settings.outputTopic)
}
}