Unverified Commit 16bc20f9 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

don't write header if no respective value present


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 0620a17e
Pipeline #19745 passed with stages
in 3 minutes and 59 seconds
......@@ -18,41 +18,44 @@
package org.memobase
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.util.StdDateFormat
import ch.memobase.rdf.FEDORA
import ch.memobase.rdf.LDP
import ch.memobase.rdf.MB
import ch.memobase.rdf.RDF
import ch.memobase.rdf.RICO
import ch.memobase.settings.SettingsLoader
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.util.StdDateFormat
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.net.URI
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.Properties
import kotlin.collections.HashMap
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
import org.apache.jena.rdf.model.impl.SelectorImpl
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClientImpl
import org.memobase.fedora.RdfResponseTypes
import org.memobase.model.EventMessage
import ch.memobase.rdf.*
import ch.memobase.settings.SettingsLoader
import org.apache.jena.rdf.model.*
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeader
import java.net.URI
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.*
import java.lang.*
import kotlin.collections.HashMap
class KafkaTopology(
private val settings: SettingsLoader
......@@ -89,7 +92,8 @@ class KafkaTopology(
var event = jsonMapper.readValue(eventJson, org.memobase.model.EventMessage::class.java)
var eventRdf: List<Pair<EventMessage, String>> = requestPrimaryResource(event)
var eventModel: Pair<EventMessage, Model> = parseModel(eventRdf[0])
val valueWithHeader: Triple<EventMessage, Model, HashMap<String, String>> = extractHeaderMetadata(eventModel)
val valueWithHeader: Triple<EventMessage, Model, HashMap<String, String>> =
extractHeaderMetadata(eventModel)
val headerData: HashMap<String, String> = valueWithHeader.third
var model: Model = addEventType(eventModel)
if (headerData["objectType"] == "rico:Record") {
......@@ -107,14 +111,25 @@ class KafkaTopology(
var producerRecord: ProducerRecord<String, String> =
ProducerRecord(outputTopic, event.eventId, modelString)
producerRecord.headers().add("institutionId", headerData["institutionId"]?.toByteArray())
producerRecord.headers().add("recordSetId", headerData["recordSetId"]?.toByteArray())
producerRecord.headers().add("recordId", headerData["recordId"]?.toByteArray())
producerRecord.headers().add("isPublished", headerData["isPublished"]?.toByteArray())
if (headerData.containsKey("institutionId")) {
producerRecord.headers().add("institutionId", headerData["institutionId"]?.toByteArray())
}
if (headerData.containsKey("recordSetId")) {
producerRecord.headers().add("recordSetId", headerData["recordSetId"]?.toByteArray())
}
if (headerData.containsKey("recordId")) {
producerRecord.headers().add("recordId", headerData["recordId"]?.toByteArray())
}
if (headerData.containsKey("isPublished")) {
producerRecord.headers().add("isPublished", headerData["isPublished"]?.toByteArray())
}
producer.send(producerRecord)
log.debug("FedoraEvent " + event.eventId +
" written to topic '" + outputTopic + "'.")
log.debug(
"FedoraEvent " + event.eventId +
" written to topic '" + outputTopic + "'."
)
}
} catch (ex: Exception) {
log.error("Exception during processing eventMessages: " + ex.toString())
......@@ -185,16 +200,16 @@ class KafkaTopology(
private fun getPropertyAsString(subjectUri: String, model: Model, property: Property): String {
val subject = model.getResource(subjectUri)
val objects = subject.listProperties(property).mapWith {
val o = it.`object`
when {
o.isURIResource -> o.asResource().uri
o.isLiteral -> o.asLiteral().value.toString()
else -> o.toString()
}
}.toList()
val o = it.`object`
when {
o.isURIResource -> o.asResource().uri
o.isLiteral -> o.asLiteral().value.toString()
else -> o.toString()
}
}.toList()
return when {
objects.size <= 0 -> throw Exception("No property ${property.toString()} found.")
objects.size > 1 -> throw Exception("More than 1 property ${property.toString()} found")
objects.size <= 0 -> throw Exception("No property $property found.")
objects.size > 1 -> throw Exception("More than 1 property $property found")
else -> objects[0]
}
}
......@@ -213,7 +228,12 @@ class KafkaTopology(
private fun requestPrimaryResource(message: EventMessage): List<Pair<EventMessage, String>> {
return try {
listOf(Pair(message, fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), RdfResponseTypes.NTRIPLES)))
listOf(
Pair(
message,
fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), RdfResponseTypes.NTRIPLES)
)
)
} catch (ex: FcrepoOperationFailedException) {
when (ex.statusCode) {
410 -> log.error("Status: ${ex.statusCode} -> The resource at path ${message.objectPath} has been deleted.")
......@@ -332,5 +352,4 @@ class KafkaTopology(
props["value.serializer"] = StringSerializer::class.java
return KafkaProducer<String, String>(props)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment