/* * XML Data Import Service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import com.beust.klaxon.Klaxon import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.Predicate import org.memobase.settings.SettingsLoader import org.memobase.sftp.SftpClient import java.io.File import java.io.InputStream import java.io.StringReader class KafkaTopology(private val settings: SettingsLoader) { private val sftpClient: SftpClient = SftpClient(settings.sftpSettings) private val xmlTransformer = XMLTransformer(settings.appSettings) private val reportingTopic = settings.outputTopic + "-reporting" fun build(): Topology { val builder = StreamsBuilder() val errorFilter = builder .stream(settings.inputTopic) .flatMapValues { _, value -> parseMessage(value) } .branch( Predicate { _, value -> value.format == Formats.error }, Predicate { _, _ -> true } ) // report filtered error message from previous job. errorFilter[0] .mapValues { key, _ -> Report( key, ReportStatus.failure, ReportMessages.processFailure(key, "The input file is invalid.") ).toJson() } .to("${settings.outputTopic}-reporting") // filtered result simply sends ERROR along! errorFilter[0] .mapValues { _ -> "ERROR" } .to(settings.outputTopic) // report full process as failure errorFilter[0] .mapValues { _ -> ProcessReport( "xml-data-transform", ReportStatus.failure, 1, 0, 1 ).toJson() } .to(settings.processReportTopic) val transformedValue = errorFilter[1] .mapValues { value -> sftpClient.open(File(value.path)) } .map { key, value -> transformXml(key, value) } streamOutput(transformedValue) return builder.build() } private fun streamOutput(stream: KStream) { stream .mapValues { value -> value.output.toString() } .to(settings.outputTopic) stream .mapValues { value -> value.getReport().toJson() } .to(reportingTopic) stream .mapValues { value -> val report = value.getReport() if (report.status == ReportStatus.success) { ProcessReport( report.id, ReportStatus.success, 1, 1, 0 ) } else { ProcessReport( report.id, ReportStatus.failure, 1, 0, 1 ) } } .mapValues { value -> value.toJson() } .to(settings.processReportTopic) } private fun parseMessage(value: String): List { return Klaxon().parse(StringReader(value)).let { if (it == null) emptyList() else listOf(it) } } private fun transformXml(key: String, data: InputStream): KeyValue { return xmlTransformer.applyXSLT(key, data) } }