Commit aa7c87d8 authored by Jonas Waeber's avatar Jonas Waeber

Add error handling for invalid n-triples

parent 071bcb21
Pipeline #16063 passed with stages
in 6 minutes and 1 second
......@@ -25,13 +25,17 @@ import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.rdf.MemobaseModel
import org.apache.jena.rdf.model.Model
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RiotException
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
import java.io.StringReader
import java.io.ByteArrayInputStream
import java.io.StringWriter
class KafkaTopology(
......@@ -61,10 +65,20 @@ class KafkaTopology(
configJoiner.join(stream, configStream)
val finishedStream = joinedStream
val parsedModelStream = joinedStream
.transformValues(HeaderExtractionTransformSupplier<JoinedValues<String, LocalTransformsLoader>>())
.mapValues { value -> createModel(value) }
.mapValues { key, value -> transformations(key, value) }
.branch(
Predicate { _, value -> value != null },
Predicate { _, _ -> true }
)
parsedModelStream[1]
.mapValues { key, _ -> Report(key, ReportStatus.failure, "Could not parse message. Found invalid input data (RiotException).").toJson() }
.to(reportTopic)
val finishedStream = parsedModelStream[0]
.mapValues { key, value -> transformations(key, value!!) }
.mapValues { value -> Pair(value.first, writeModel(value.second)) }
finishedStream
......@@ -78,9 +92,13 @@ class KafkaTopology(
return builder
}
private fun createModel(input: Pair<JoinedValues<String, LocalTransformsLoader>, HeaderMetadata>): PreparedInput {
private fun createModel(input: Pair<JoinedValues<String, LocalTransformsLoader>, HeaderMetadata>): PreparedInput? {
val model = MemobaseModel()
model.read(StringReader(input.first.left), "", "NTRIPLES")
try {
RDFDataMgr.read(model, ByteArrayInputStream(input.first.left.toByteArray()), Lang.NTRIPLES)
} catch (ex: RiotException) {
return null
}
return PreparedInput(model, input.first.right, input.second)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment