Commit cb9e0f57 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Improve error reporting on failed input

parent 2b6d62e5
Pipeline #10641 passed with stages
in 6 minutes and 40 seconds
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.memobase package org.memobase
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology import org.apache.kafka.streams.Topology
...@@ -26,8 +27,10 @@ import org.apache.kafka.streams.kstream.KStream ...@@ -26,8 +27,10 @@ import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.builder.ResourceBuilder import org.memobase.builder.ResourceBuilder
import org.memobase.helpers.ReportStatus
import org.memobase.mapping.MappingConfig import org.memobase.mapping.MappingConfig
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import javax.management.remote.rmi._RMIConnectionImpl_Tie
class KafkaTopology( class KafkaTopology(
private val settings: SettingsLoader private val settings: SettingsLoader
...@@ -41,7 +44,27 @@ class KafkaTopology( ...@@ -41,7 +44,27 @@ class KafkaTopology(
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
val extractedRecordIdStream = stream.flatMapValues { value -> parseJsonFile(value) } val jsonStream = stream.flatMapValues { value -> parseJsonFile(value) }
.branch(
Predicate { _, value -> value.containsKey("ERROR") },
Predicate { _, _ -> true }
)
jsonStream[0]
.mapValues { value -> value["ERROR"] as String }
.to(settings.outputTopic)
jsonStream[0]
.mapValues { key, value ->
Report(
key,
ReportStatus.failure,
"Failed to parse json input: $value."
).toJson()
}
.to(reportTopic)
val extractedRecordIdStream = jsonStream[1]
.mapValues { value -> buildResources(value) } .mapValues { value -> buildResources(value) }
.mapValues { value -> value.extractRecordId() } .mapValues { value -> value.extractRecordId() }
...@@ -137,14 +160,27 @@ class KafkaTopology( ...@@ -137,14 +160,27 @@ class KafkaTopology(
} }
private fun parseJsonFile(message: String): List<Map<String, String>> { private fun parseJsonFile(message: String): List<Map<String, String>> {
Klaxon().parse<Map<String, String>>(message).let { return if (message.contains("ERROR")) {
return if (it != null) { listOf(mapOf(Pair("ERROR", message)))
listOf(it) } else {
} else { try {
emptyList() Klaxon().parse<Map<String, String>>(message).let {
return if (it != null) {
listOf(it)
} else {
listOf(mapOf(Pair("ERROR", "ERROR: Could not parse message: $message.")))
}
}
} catch (ex: KlaxonException) {
listOf(
mapOf(
Pair(
"ERROR",
"ERROR: Could not parse message: $message. Because of ${ex.localizedMessage}."
)
)
)
} }
} }
} }
} }
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment