Unverified Commit 0cc3c37e authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

print value when having casting exception


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 61ec3a33
Pipeline #16368 passed with stage
in 1 minute and 18 seconds
......@@ -20,20 +20,31 @@ package ch.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import java.io.StringReader
import java.lang.ClassCastException
class KafkaTopology(
private val settings: SettingsLoader
) {
private val klaxon = Klaxon()
private val log = LogManager.getLogger("ReportProcessorApp")
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
builder.stream<String, String>(settings.inputTopic)
.transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value -> klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) } }
.mapValues { value ->
try {
klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) }
} catch (ex: ClassCastException) {
log.info(value.first)
throw ClassCastException(ex.message)
} finally {
}
}
.map { key, value -> KeyValue("$key-${value?.report?.step}", klaxon.toJsonString(value)) }
.to(settings.outputTopic)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment