/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.kafka.utils.models.ValueWithException
import ch.memobase.mapping.MappingConfigurationParser
import ch.memobase.mapping.exceptions.InvalidMappingException
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import mapping.MapperConfiguration
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.ReportStatus
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("KafkaTopology")
private val reportTopic = settings.processReportTopic
private val klaxon = Klaxon()
private val configJoiner = ConfigJoiner(
ImportService.Mapping,
Serdes.String(),
Serdes.serdeFrom(
{ _, data ->
data
},
{ _, data ->
data
}
),
this::parseConfig
)
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val configStream = builder.stream("import-process-config")
.map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
val stream = builder.stream(settings.inputTopic)
val joinedStream =
configJoiner.join(stream, configStream)
val handledStream = joinedStream
.mapValues { readOnlyKey, value -> handleExceptions(value) }
.branch(
Predicate { _, value -> value.third != "" },
Predicate { _, _ -> true }
)
handledStream[0]
.mapValues { readOnlyKey, value ->
Report(readOnlyKey, ReportStatus.failure, value.third).toJson()
}
.to(reportTopic)
val parsedStream = handledStream[1]
.mapValues { value -> Pair(value.first, value.second) }
.mapValues { readOnlyKey, value ->
parse(readOnlyKey, value)
}
.branch(
Predicate { _, value -> value.third != null },
Predicate { _, _ -> true }
)
parsedStream[0]
.mapValues { key, value ->
value.third?.toJson() ?: Report(
key,
ReportStatus.failure,
"Caught an error, but not report was created."
)
}
.to(reportTopic)
val extractedRecordIdStream = parsedStream[1]
.mapValues { value -> Pair(value.first!!, value.second!!) }
.transformValues(HeaderExtractionTransformSupplier, MapperConfiguration>>())
.mapValues { value -> buildResources(value) }
.mapValues { value -> value.extractRecordId() }
val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)
val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)
val completedMappingStream = hasIdAndTypeStream
.mapValues { value -> value.generateRecord() }
.mapValues { value -> value.generatePhysicalObject() }
.mapValues { value -> value.generateDigitalObject() }
.mapValues { value -> value.addDerivedConnection() }
val recordStream = completedMappingStream
.map { _, value -> writeRecord(value) }
objectOutput(recordStream)
return builder
}
private fun handleExceptions(value: ValueWithException>): Triple {
return when {
value.hasException() -> {
Triple("", ByteArray(0), value.exception.localizedMessage)
}
value.hasValue() -> {
Triple(value.value.left, value.value.right, "")
}
else -> {
Triple("", ByteArray(0), "Could not handle error in kafka utils library.")
}
}
}
private fun objectOutput(stream: KStream>) {
stream
.mapValues { _, value -> value.first }
.to(settings.outputTopic)
stream
.mapValues { _, value -> value.second.toJson() }
.to(reportTopic)
}
private fun buildResources(value: Pair, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
return ResourceBuilder(
value.first.first,
value.first.second,
value.second.institutionId,
value.second.recordSetId,
value.second.isPublished
)
}
private fun extractRecordId(stream: KStream): KStream {
val hasRecordId = stream.branch(
Predicate { _, value -> value.hasRecordId() },
Predicate { _, _ -> true }
)
// early termination if there is no record id!
val noRecordId = hasRecordId[1]
noRecordId
.mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.").toJson() }
.to(reportTopic)
return hasRecordId[0]
.mapValues { value -> value.extractRecordTypeValue() }
}
private fun extractRecordTypeValue(stream: KStream): KStream {
val hasRecordTypeValue = stream
.branch(
Predicate { _, value -> value.hasRecordType() },
Predicate { _, _ -> true }
)
val noRecordTypeValue = hasRecordTypeValue[1]
noRecordTypeValue
.mapValues { key, _ ->
Report(
key,
ReportStatus.failure,
"No correct record type found for record $key."
).toJson()
}
.to(reportTopic)
return hasRecordTypeValue[0]
}
private fun writeRecord(builder: ResourceBuilder): KeyValue> {
val result = builder.writeRecord()
return KeyValue(
result.first,
Pair(
result.second,
Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.")
)
)
}
private fun parseConfig(data: ByteArray): ByteArray {
return data
}
private fun parse(
key: String,
value: Pair
): Triple