/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.kafka.utils.models.ValueWithException
import ch.memobase.mapping.MapperConfiguration
import ch.memobase.mapping.MappingConfigurationParser
import ch.memobase.exceptions.InvalidMappingException
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("MappingProcessor")
private val reportTopic = settings.processReportTopic
private val klaxon = Klaxon()
private val configJoiner = ConfigJoiner(
ImportService.Mapping,
Serdes.String(),
Serdes.serdeFrom(
{ _, data ->
data
},
{ _, data ->
data
}
),
this::parseConfig
)
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val configStream = builder.stream("import-process-config")
.map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
val stream = builder.stream(settings.inputTopic)
val joinedStream =
configJoiner.join(stream, configStream)
val handledStream = joinedStream
.mapValues { value -> handleExceptions(value) }
.branch(
Predicate { _, value -> value.third != "" },
Predicate { _, _ -> true }
)
handledStream[0]
.mapValues { readOnlyKey, value ->
Report(readOnlyKey, ReportStatus.fatal, value.third, Service.step).toJson()
}
.to(reportTopic)
val parsedStream = handledStream[1]
.mapValues { value -> Pair(value.first, value.second) }
.mapValues { readOnlyKey, value ->
parse(readOnlyKey, value)
}
.branch(
Predicate { _, value -> value.third != null },
Predicate { _, _ -> true }
)
parsedStream[0]
.mapValues { key, value ->
value.third?.toJson() ?: Report(
key,
ReportStatus.fatal,
"Caught an error, but no report was created.",
Service.step
)
}
.to(reportTopic)
val extractedRecordIdStream = parsedStream[1]
.mapValues { value -> Pair(value.first!!, value.second!!) }
.transformValues(HeaderExtractionTransformSupplier, MapperConfiguration>>())
.mapValues { value -> buildResources(value) }
.mapValues { value -> value.extractRecordId() }
val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)
val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)
val completedMappingStream = hasIdAndTypeStream
.mapValues { value -> value.generateRecord() }
.mapValues { value -> value.generatePhysicalObject() }
.mapValues { value -> value.generateDigitalObject() }
.mapValues { value -> value.addDerivedConnection() }
val recordStream = completedMappingStream
.map { _, value -> writeRecord(value) }
objectOutput(recordStream)
return builder
}
private fun handleExceptions(value: ValueWithException>): Triple {
return when {
value.hasException() -> {
Triple("", ByteArray(0), value.exception.localizedMessage)
}
value.hasValue() -> {
Triple(value.value.left, value.value.right, "")
}
else -> {
Triple("", ByteArray(0), "Could not handle error in kafka utils library.")
}
}
}
private fun objectOutput(stream: KStream>) {
stream
.mapValues { _, value -> value.first }
.to(settings.outputTopic)
stream
.mapValues { _, value -> value.second.toJson() }
.to(reportTopic)
}
private fun buildResources(value: Pair, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
return ResourceBuilder(
value.first.first,
value.first.second,
value.second.institutionId,
value.second.recordSetId,
value.second.isPublished
)
}
private fun extractRecordId(stream: KStream): KStream {
val hasRecordId = stream.branch(
Predicate { _, value -> value.hasRecordId() },
Predicate { _, _ -> true }
)
// early termination if there is no record id!
val noRecordId = hasRecordId[1]
noRecordId
.mapValues { key, value -> Report(key, ReportStatus.fatal, value.errorMessage, Service.step).toJson() }
.to(reportTopic)
return hasRecordId[0]
.mapValues { value -> value.extractRecordTypeValue() }
}
private fun extractRecordTypeValue(stream: KStream): KStream {
val hasRecordTypeValue = stream
.branch(
Predicate { _, value -> value.hasRecordType() },
Predicate { _, _ -> true }
)
val noRecordTypeValue = hasRecordTypeValue[1]
noRecordTypeValue
.mapValues { key, value ->
Report(
key,
ReportStatus.fatal,
value.errorMessage,
Service.step
).toJson()
}
.to(reportTopic)
return hasRecordTypeValue[0]
}
private fun writeRecord(builder: ResourceBuilder): KeyValue> {
val result = builder.writeRecord(RDFFormat.NTRIPLES_UTF8)
return KeyValue(
result.first,
Pair(
result.second,
Report(result.first, ReportStatus.success, "", Service.step)
)
)
}
private fun parseConfig(data: ByteArray): ByteArray {
return data
}
private fun parse(
key: String,
value: Pair
): Triple