Commit 97d7e739 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Improve error handling

parent 37e664d9
Pipeline #16461 passed with stages
in 6 minutes and 11 seconds
......@@ -18,6 +18,7 @@
package ch.memobase
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.logging.log4j.LogManager
......@@ -36,13 +37,15 @@ class KafkaTopology(
builder.stream<String, String>(settings.inputTopic)
.transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value ->
.flatMapValues { value ->
try {
klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) }
listOf(klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) })
} catch (ex: ClassCastException) {
log.info(value.first)
throw ClassCastException(ex.message)
} finally {
log.error(ex.localizedMessage + " in " + value.first)
emptyList<IndexReport>()
} catch (ex: KlaxonException) {
log.error(ex.localizedMessage + " in " + value.first)
emptyList<IndexReport>()
}
}
.map { key, value -> KeyValue("$key-${value?.report?.step}", klaxon.toJsonString(value)) }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment