Unverified Commit 684c0a96 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

catch model build exceptions

parent a098d053
Pipeline #16014 passed with stages
in 4 minutes and 58 seconds
......@@ -24,6 +24,7 @@ import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
import org.apache.jena.riot.RiotException
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.rdf.EBUCORE
......@@ -50,10 +51,26 @@ class KafkaTopology(private val settings: SettingsLoader) {
val stream = builder.stream<String, String>(settings.inputTopic)
val instantiationBranch = stream
val model = stream
.transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.branch(
Predicate { _, value -> value != null },
Predicate { _, _ -> true }
)
model[1]
.mapValues { key, _ ->
Report(
key,
ReportStatus.failure,
"Can't create model. Is there anything wrong with the data?"
)
}
.to(reportingTopic)
val instantiationBranch = model[0]
.mapValues { value -> extractSubjects(value!!) }
.mapValues { readOnlyKey, value ->
enrichSftpLocator(
readOnlyKey,
......@@ -170,9 +187,13 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
}
private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
val model = ModelFactory.createDefaultModel()
model.read(StringReader(data.first), "", Constant.rdfParserLang)
try {
model.read(StringReader(data.first), "", Constant.rdfParserLang)
} catch (ex: RiotException) {
return null
}
return Pair(model, data.second)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment