Commit 17e25bde authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add json parser exception handling.

Drop and log messages which cannot be parsed.
parent 6c8a61bf
Pipeline #11779 passed with stages
in 6 minutes and 16 seconds
......@@ -18,6 +18,8 @@
package org.memobase
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import org.apache.jena.rdf.model.Model
......@@ -64,7 +66,7 @@ class KafkaTopology(
val stream = builder.stream<String, String>(settings.inputTopic)
val objectBranches = stream
.mapValues { value -> parseMessage(value) }
.flatMapValues { value -> parseMessage(value) }
.branch(
// TODO: Add actual values.
Predicate { _, value -> value.objectType == "rico:Record" },
......@@ -101,8 +103,16 @@ class KafkaTopology(
return builder.build()
}
private fun parseMessage(data: String): EventMessage {
return objectMapper.readValue(data, EventMessage::class.java)
private fun parseMessage(data: String): List<EventMessage> {
return try {
listOf(objectMapper.readValue(data, EventMessage::class.java))
} catch (ex: JsonParseException) {
log.error("Invalid json found: $data")
emptyList()
} catch (ex: JsonMappingException) {
log.error("Could not parse the json object as an event message: $data")
emptyList()
}
}
private fun requestPrimaryResource(message: EventMessage): Pair<EventMessage, String> {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment