/* * mapper-service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import ch.memobase.builder.ResourceBuilder import ch.memobase.kafka.utils.ConfigJoiner import ch.memobase.kafka.utils.models.ImportService import ch.memobase.kafka.utils.models.JoinedValues import ch.memobase.kafka.utils.models.ValueWithException import ch.memobase.mapping.MapperConfiguration import ch.memobase.mapping.MappingConfigurationParser import ch.memobase.exceptions.InvalidMappingException import ch.memobase.reporting.Report import ch.memobase.reporting.ReportStatus import ch.memobase.settings.HeaderExtractionTransformSupplier import ch.memobase.settings.HeaderMetadata import ch.memobase.settings.SettingsLoader import com.beust.klaxon.Klaxon import com.beust.klaxon.KlaxonException import org.apache.jena.riot.RDFFormat import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.Predicate import org.apache.logging.log4j.LogManager class KafkaTopology( private val settings: SettingsLoader ) { private val log = LogManager.getLogger("MappingProcessor") private val reportTopic = settings.processReportTopic private val klaxon = Klaxon() private val configJoiner = ConfigJoiner( ImportService.Mapping, Serdes.String(), Serdes.serdeFrom( { _, data -> data }, { _, data -> data } ), this::parseConfig ) fun prepare(): StreamsBuilder { val builder = StreamsBuilder() val configStream = builder.stream("import-process-config") .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) } val stream = builder.stream(settings.inputTopic) val joinedStream = configJoiner.join(stream, configStream) val handledStream = joinedStream .mapValues { value -> handleExceptions(value) } .branch( Predicate { _, value -> value.third != "" }, Predicate { _, _ -> true } ) handledStream[0] .mapValues { readOnlyKey, value -> Report(readOnlyKey, ReportStatus.fatal, value.third, Service.step).toJson() } .to(reportTopic) val parsedStream = handledStream[1] .mapValues { value -> Pair(value.first, value.second) } .mapValues { readOnlyKey, value -> parse(readOnlyKey, value) } .branch( Predicate { _, value -> value.third != null }, Predicate { _, _ -> true } ) parsedStream[0] .mapValues { key, value -> value.third?.toJson() ?: Report( key, ReportStatus.fatal, "Caught an error, but no report was created.", Service.step ) } .to(reportTopic) val extractedRecordIdStream = parsedStream[1] .mapValues { value -> Pair(value.first!!, value.second!!) } .transformValues(HeaderExtractionTransformSupplier, MapperConfiguration>>()) .mapValues { value -> buildResources(value) } .mapValues { value -> value.extractRecordId() } val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream) val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream) val completedMappingStream = hasIdAndTypeStream .mapValues { value -> value.generateRecord() } .mapValues { value -> value.generatePhysicalObject() } .mapValues { value -> value.generateDigitalObject() } .mapValues { value -> value.addDerivedConnection() } val recordStream = completedMappingStream .map { _, value -> writeRecord(value) } objectOutput(recordStream) return builder } private fun handleExceptions(value: ValueWithException>): Triple { return when { value.hasException() -> { Triple("", ByteArray(0), value.exception.localizedMessage) } value.hasValue() -> { Triple(value.value.left, value.value.right, "") } else -> { Triple("", ByteArray(0), "Could not handle error in kafka utils library.") } } } private fun objectOutput(stream: KStream>) { stream .mapValues { _, value -> value.first } .to(settings.outputTopic) stream .mapValues { _, value -> value.second.toJson() } .to(reportTopic) } private fun buildResources(value: Pair, MapperConfiguration>, HeaderMetadata>): ResourceBuilder { return ResourceBuilder( value.first.first, value.first.second, value.second.institutionId, value.second.recordSetId, value.second.isPublished ) } private fun extractRecordId(stream: KStream): KStream { val hasRecordId = stream.branch( Predicate { _, value -> value.hasRecordId() }, Predicate { _, _ -> true } ) // early termination if there is no record id! val noRecordId = hasRecordId[1] noRecordId .mapValues { key, value -> Report(key, ReportStatus.fatal, value.errorMessage, Service.step).toJson() } .to(reportTopic) return hasRecordId[0] .mapValues { value -> value.extractRecordTypeValue() } } private fun extractRecordTypeValue(stream: KStream): KStream { val hasRecordTypeValue = stream .branch( Predicate { _, value -> value.hasRecordType() }, Predicate { _, _ -> true } ) val noRecordTypeValue = hasRecordTypeValue[1] noRecordTypeValue .mapValues { key, value -> Report( key, ReportStatus.fatal, value.errorMessage, Service.step ).toJson() } .to(reportTopic) return hasRecordTypeValue[0] } private fun writeRecord(builder: ResourceBuilder): KeyValue> { val result = builder.writeRecord(RDFFormat.NTRIPLES_UTF8) return KeyValue( result.first, Pair( result.second, Report(result.first, ReportStatus.success, "", Service.step) ) ) } private fun parseConfig(data: ByteArray): ByteArray { return data } private fun parse( key: String, value: Pair ): Triple?, MapperConfiguration?, Report?> { return try { val mapperConfiguration = MappingConfigurationParser(value.second) val parsedSource = klaxon.parse>(value.first) if (parsedSource != null) { log.info("Successfully parsed source & mapping configuration.") Triple(parsedSource, mapperConfiguration.get(), null) } else { log.error("Parsed source is empty: ${value.first}.") Triple(null, null, Report(key, ReportStatus.fatal, "Could not parse source document.", Service.step)) } } catch (ex: InvalidMappingException) { log.error(ex.localizedMessage) Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step)) } catch (ex: KlaxonException) { log.error(ex.localizedMessage) Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step)) } catch (ex: NullPointerException) { log.error(ex.localizedMessage) Triple( null, null, Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", Service.step) ) } } }