Commit 3967501d authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Added exception handling to fedora client.

parent 8b957776
Pipeline #11789 passed with stages
in 4 minutes and 45 seconds
......@@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClientImpl
import org.memobase.fedora.RdfResponseTypes
import org.memobase.model.EventMessage
......@@ -68,7 +69,7 @@ class KafkaTopology(
)
objectBranches[0]
.mapValues { value -> requestPrimaryResource(value) }
.flatMapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> requestAdditionalRecordResources(value) }
......@@ -77,7 +78,7 @@ class KafkaTopology(
.to(settings.outputTopic + "-json-records")
objectBranches[1]
.mapValues { value -> requestPrimaryResource(value) }
.flatMapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> filterFedoraProperties(value) }
......@@ -85,7 +86,7 @@ class KafkaTopology(
.to(settings.outputTopic + "-json-institutions")
objectBranches[2]
.mapValues { value -> requestPrimaryResource(value) }
.flatMapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> filterFedoraProperties(value) }
......@@ -108,8 +109,17 @@ class KafkaTopology(
}
}
private fun requestPrimaryResource(message: EventMessage): Pair<EventMessage, String> {
return Pair(message, fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), RdfResponseTypes.NTRIPLES))
private fun requestPrimaryResource(message: EventMessage): List<Pair<EventMessage, String>> {
return try {
listOf(Pair(message, fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), RdfResponseTypes.NTRIPLES)))
} catch (ex: FcrepoOperationFailedException) {
when (ex.statusCode) {
410 -> log.error("Status: ${ex.statusCode} -> The resource at path ${message.objectPath} has been deleted.")
404 -> log.error("Status: ${ex.statusCode} -> No resource found at path ${message.objectPath}.")
else -> log.error("Status: ${ex.statusCode} -> Unknown Error: ${ex.localizedMessage}.")
}
emptyList()
}
}
private fun parseModel(input: Pair<EventMessage, String>): Pair<EventMessage, Model> {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment