Commit 9a32fc10 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

[WIP] begin to refactor kafka topology.

parent 54764c26
Pipeline #20353 failed with stages
in 2 minutes and 8 seconds
......@@ -21,17 +21,8 @@ package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.models.ErrorResult
import org.memobase.models.Formats
import org.memobase.models.Message
import org.memobase.models.ParserResult
import org.memobase.models.ProcessReport
import org.memobase.models.Report
import org.memobase.models.ReportMessages
import org.memobase.models.ReportStatus
import org.memobase.models.*
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) {
......@@ -40,67 +31,40 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val reportingTopic = settings.outputTopic + "-reporting"
private val klaxon = Klaxon()
private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls, Formats.ods)
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val branchedSource = builder
.stream<String, String>(settings.inputTopic)
.flatMapValues { value -> listOfNotNull(klaxon.parse<Message>(value)) }
.branch(
Predicate { _, value -> value.format == Formats.error },
Predicate { _, _ -> true }
)
val parsedTable = builder
.stream<String, String>(settings.inputTopic)
.mapValues { value -> klaxon.parse<Message>(value) }
.filter { _, value -> acceptedFormats.contains(value?.format) }
.mapValues { key, value -> parser.parseTable(key, value!!) }
ignoreFaultyInput(branchedSource[0])
processValidInput(branchedSource[1]
.mapValues { key, value -> parser.parseTable(key, value) })
return builder
}
private fun processValidInput(stream: KStream<String, ParserResult>) {
val records = stream
.flatMapValues { _, value -> value.messages }
.flatMapValues { _, value -> value.messages }
records
.map { _, value -> KeyValue(value.key, value.value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> value.toJsonString() }
.to(settings.outputTopic)
.map { _, value -> KeyValue(value.key, value.value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> value.toJsonString() }
.to(settings.outputTopic)
records
.map { _, value -> KeyValue(value.report.id, value.report) }
.mapValues { value -> value.toJson() }
.to(reportingTopic)
stream
.map { _, value -> KeyValue(value.processReport.id, value.processReport.toJson()) }
.to(settings.processReportTopic)
}
private fun ignoreFaultyInput(stream: KStream<String, Message>) {
stream
.mapValues { _ -> ErrorResult.get().toJsonString() }
.to(settings.outputTopic)
stream
.mapValues { key, _ ->
Report(
key,
ReportStatus.failure,
ReportMessages.processFailure(key, "of an invalid input message.")
)
}
.mapValues { value -> value.toJson() }
.to("${settings.outputTopic}-reporting")
.map { _, value -> KeyValue(value.report.id, value.report) }
.mapValues { value -> value.toJson() }
.to(reportingTopic)
stream
.mapValues { _ ->
ProcessReport(ReportStatus.failure, 1)
}
.mapValues { value -> value.toJson() }
.to(settings.processReportTopic)
.map { _, value -> KeyValue(value.processReport.id, value.processReport.toJson()) }
.to(settings.processReportTopic)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment