/*
* XML Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import java.io.File
import java.io.InputStream
import java.io.StringReader
class KafkaTopology(private val settings: SettingsLoader) {
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val xmlTransformer = XMLTransformer(settings.appSettings)
private val reportingTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
val builder = StreamsBuilder()
val errorFilter = builder
.stream(settings.inputTopic)
.flatMapValues { _, value -> parseMessage(value) }
.branch(
Predicate { _, value -> value.format == Formats.error },
Predicate { _, _ -> true }
)
// report filtered error message from previous job.
errorFilter[0]
.mapValues { key, _ ->
Klaxon().toJsonString(
Report(
key,
ReportStatus.failure,
ReportMessages.processFailure(key, "The input file is invalid.")
)
)
}
.to("${settings.outputTopic}-reporting")
// filtered result simply sends ERROR along!
errorFilter[0]
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
// report full process as failure
errorFilter[0]
.mapValues { key, _ ->
Klaxon().toJsonString(
Report(
"xml-data-transform",
ReportStatus.failure,
ReportMessages.processFailure(key, "The input file is invalid.")
)
)
}
.to(settings.processReportTopic)
// TODO: Implement multi record xml documents splitter!
val transformedValue = errorFilter[1]
.mapValues { value -> sftpClient.open(File(value.path)) }
.map { key, value -> transformXml(key, value) }
streamOutput(transformedValue)
return builder.build()
}
private fun streamOutput(stream: KStream) {
stream
.mapValues { value -> value.output.toString() }
.to(settings.outputTopic)
stream
.mapValues { value -> value.report!!.toJson() }
.to(reportingTopic)
stream
.mapValues { value -> value.report!!.toJson() }
.to(settings.processReportTopic)
}
private fun parseMessage(value: String): List {
return Klaxon().parse(StringReader(value)).let {
if (it == null) emptyList()
else listOf(it)
}
}
private fun transformXml(key: String, data: InputStream): KeyValue {
return xmlTransformer.applyXSLT(key, data)
}
}