Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit cfb13a9a authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

Merge branch 'fix-casting-error' into 'master'

print value when having casting exception

See merge request !1
parents 61ec3a33 0cc3c37e
Pipeline #16370 passed with stage
in 1 minute and 15 seconds
...@@ -20,20 +20,31 @@ package ch.memobase ...@@ -20,20 +20,31 @@ package ch.memobase
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier import settings.HeaderExtractionTransformSupplier
import java.io.StringReader import java.io.StringReader
import java.lang.ClassCastException
class KafkaTopology( class KafkaTopology(
private val settings: SettingsLoader private val settings: SettingsLoader
) { ) {
private val klaxon = Klaxon() private val klaxon = Klaxon()
private val log = LogManager.getLogger("ReportProcessorApp")
fun prepare(): StreamsBuilder { fun prepare(): StreamsBuilder {
val builder = StreamsBuilder() val builder = StreamsBuilder()
builder.stream<String, String>(settings.inputTopic) builder.stream<String, String>(settings.inputTopic)
.transformValues(HeaderExtractionTransformSupplier<String>()) .transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value -> klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) } } .mapValues { value ->
try {
klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) }
} catch (ex: ClassCastException) {
log.info(value.first)
throw ClassCastException(ex.message)
} finally {
}
}
.map { key, value -> KeyValue("$key-${value?.report?.step}", klaxon.toJsonString(value)) } .map { key, value -> KeyValue("$key-${value?.report?.step}", klaxon.toJsonString(value)) }
.to(settings.outputTopic) .to(settings.outputTopic)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment