/* * mapper-service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import ch.memobase.builder.ResourceBuilder import ch.memobase.kafka.utils.ConfigJoiner import ch.memobase.kafka.utils.models.ImportService import ch.memobase.kafka.utils.models.JoinedValues import ch.memobase.kafka.utils.models.ValueWithException import ch.memobase.mapping.MappingConfigurationParser import ch.memobase.mapping.exceptions.InvalidMappingException import com.beust.klaxon.Klaxon import com.beust.klaxon.KlaxonException import mapping.MapperConfiguration import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.Predicate import org.apache.logging.log4j.LogManager import org.memobase.helpers.ReportStatus import org.memobase.settings.SettingsLoader import settings.HeaderExtractionTransformSupplier import settings.HeaderMetadata class KafkaTopology( private val settings: SettingsLoader ) { private val log = LogManager.getLogger("KafkaTopology") private val reportTopic = settings.processReportTopic private val klaxon = Klaxon() private val configJoiner = ConfigJoiner( ImportService.Mapping, Serdes.String(), Serdes.serdeFrom( { _, data -> data }, { _, data -> data } ), this::parseConfig ) fun prepare(): StreamsBuilder { val builder = StreamsBuilder() val configStream = builder.stream("import-process-config") .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) } val stream = builder.stream(settings.inputTopic) val joinedStream = configJoiner.join(stream, configStream) val handledStream = joinedStream .mapValues { readOnlyKey, value -> handleExceptions(value) } .branch( Predicate { _, value -> value.third != "" }, Predicate { _, _ -> true } ) handledStream[0] .mapValues { readOnlyKey, value -> Report(readOnlyKey, ReportStatus.failure, value.third).toJson() } .to(reportTopic) val parsedStream = handledStream[1] .mapValues { value -> Pair(value.first, value.second) } .mapValues { readOnlyKey, value -> parse(readOnlyKey, value) } .branch( Predicate { _, value -> value.third != null }, Predicate { _, _ -> true } ) parsedStream[0] .mapValues { key, value -> value.third?.toJson() ?: Report( key, ReportStatus.failure, "Caught an error, but not report was created." ) } .to(reportTopic) val extractedRecordIdStream = parsedStream[1] .mapValues { value -> Pair(value.first!!, value.second!!) } .transformValues(HeaderExtractionTransformSupplier, MapperConfiguration>>()) .mapValues { value -> buildResources(value) } .mapValues { value -> value.extractRecordId() } val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream) val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream) val completedMappingStream = hasIdAndTypeStream .mapValues { value -> value.generateRecord() } .mapValues { value -> value.generatePhysicalObject() } .mapValues { value -> value.generateDigitalObject() } .mapValues { value -> value.addDerivedConnection() } val recordStream = completedMappingStream .map { _, value -> writeRecord(value) } objectOutput(recordStream) return builder } private fun handleExceptions(value: ValueWithException>): Triple { return when { value.hasException() -> { Triple("", ByteArray(0), value.exception.localizedMessage) } value.hasValue() -> { Triple(value.value.left, value.value.right, "") } else -> { Triple("", ByteArray(0), "Could not handle error in kafka utils library.") } } } private fun objectOutput(stream: KStream>) { stream .mapValues { _, value -> value.first } .to(settings.outputTopic) stream .mapValues { _, value -> value.second.toJson() } .to(reportTopic) } private fun buildResources(value: Pair, MapperConfiguration>, HeaderMetadata>): ResourceBuilder { return ResourceBuilder( value.first.first, value.first.second, value.second.institutionId, value.second.recordSetId, value.second.isPublished ) } private fun extractRecordId(stream: KStream): KStream { val hasRecordId = stream.branch( Predicate { _, value -> value.hasRecordId() }, Predicate { _, _ -> true } ) // early termination if there is no record id! val noRecordId = hasRecordId[1] noRecordId .mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.").toJson() } .to(reportTopic) return hasRecordId[0] .mapValues { value -> value.extractRecordTypeValue() } } private fun extractRecordTypeValue(stream: KStream): KStream { val hasRecordTypeValue = stream .branch( Predicate { _, value -> value.hasRecordType() }, Predicate { _, _ -> true } ) val noRecordTypeValue = hasRecordTypeValue[1] noRecordTypeValue .mapValues { key, _ -> Report( key, ReportStatus.failure, "No correct record type found for record $key." ).toJson() } .to(reportTopic) return hasRecordTypeValue[0] } private fun writeRecord(builder: ResourceBuilder): KeyValue> { val result = builder.writeRecord() return KeyValue( result.first, Pair( result.second, Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.") ) ) } private fun parseConfig(data: ByteArray): ByteArray { return data } private fun parse( key: String, value: Pair ): Triple?, MapperConfiguration?, Report?> { return try { val mapperConfiguration = MappingConfigurationParser(value.second) val parsedSource = klaxon.parse>(value.first) if (parsedSource != null) { log.info("Successfully parsed source & mapping configuration.") Triple(parsedSource, mapperConfiguration.get(), null) } else { log.error("Parsed source is empty: ${value.first}.") Triple(null, null, Report(key, ReportStatus.failure, "Found empty source document.")) } } catch (ex: InvalidMappingException) { log.error(ex.localizedMessage) Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage)) } catch (ex: KlaxonException) { log.error(ex.localizedMessage) Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage)) } catch (ex: NullPointerException) { log.error(ex.localizedMessage) Triple( null, null, Report(key, ReportStatus.failure, "There's no data to be processed: ${ex.localizedMessage}") ) } } }