Commit f0aafd7b authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add error handling for missing xslt configuration.

parent 7c57956b
Pipeline #16093 passed with stages
in 5 minutes and 42 seconds
......@@ -20,12 +20,14 @@ package org.memobase
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.models.*
import org.memobase.settings.SettingsLoader
......@@ -67,10 +69,26 @@ class KafkaTopology(private val settings: SettingsLoader) {
val joinedStream = configJoiner.join(dataStream, configStream)
val saxHandlerStream = joinedStream
val configNullBranch = joinedStream
.branch(
Predicate { _, value -> value.right == null },
Predicate { _, _ -> true }
)
configNullBranch[0]
.transformValues(HeaderExtractionTransformSupplier<JoinedValues<Message, ByteArray>>())
.mapValues { key, value ->
Report(
key,
ReportStatus.failure,
"Could not find a matching xslt configuration for record set ${value.second.recordSetId}."
)
}
val saxHandlerStream = configNullBranch[1]
.mapValues { value ->
log.debug("Combine joined inputs.")
Input(value.left, value.right) }
Input(value.left, value.right)
}
.transformValues(HeaderExtractionTransformSupplier<Input>())
.mapValues { value ->
Content(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment