Commit 6c93a52c authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update event message

parent 40b93481
...@@ -10,5 +10,5 @@ tag: "latest" ...@@ -10,5 +10,5 @@ tag: "latest"
deploymentName: fedora-metadata-extractor deploymentName: fedora-metadata-extractor
kafkaConfigs: prod-kafka-bootstrap-servers kafkaConfigs: prod-kafka-bootstrap-servers
fedoraConfigs: prod-fedora-configs fedoraConfigs: prod-fedora-configs
outputTopic: fedora-metadata-extractor-json-output outputTopic: fedora-output
inputTopic: fedora-events inputTopic: fedora-events
\ No newline at end of file
...@@ -25,7 +25,6 @@ import org.apache.jena.rdf.model.ModelFactory ...@@ -25,7 +25,6 @@ import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.impl.SelectorImpl import org.apache.jena.rdf.model.impl.SelectorImpl
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.jena.riot.Lang import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
...@@ -37,6 +36,7 @@ import org.memobase.fedora.RdfResponseTypes ...@@ -37,6 +36,7 @@ import org.memobase.fedora.RdfResponseTypes
import org.memobase.model.EventMessage import org.memobase.model.EventMessage
import org.memobase.rdf.* import org.memobase.rdf.*
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import rdf.MB
import java.io.StringWriter import java.io.StringWriter
import java.net.URI import java.net.URI
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
...@@ -67,34 +67,41 @@ class KafkaTopology( ...@@ -67,34 +67,41 @@ class KafkaTopology(
.mapValues { value -> parseMessage(value) } .mapValues { value -> parseMessage(value) }
.branch( .branch(
// TODO: Add actual values. // TODO: Add actual values.
Predicate { _, value -> value.objectTypes == "rico:Record" }, Predicate { _, value -> value.objectType == "rico:Record" },
Predicate { _, value -> Predicate { _, value -> value.objectType == "rico:CorporateBody" },
value.objectTypes == "rico:CorporateBody" || value.objectTypes == "rico:RecordSet" Predicate { _, value -> value.objectType == "rico:RecordSet" }
}
) )
objectBranches[0] objectBranches[0]
.mapValues { value -> requestPrimaryResource(value) } .mapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) } .mapValues { value -> parseModel(value) }
.mapValues { value -> addStatus(value) } .mapValues { value -> addEventType(value) }
.mapValues { value -> requestAdditionalRecordResources(value) } .mapValues { value -> requestAdditionalRecordResources(value) }
.mapValues { value -> filterFedoraProperties(value) } .mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> writeModel(value) } .mapValues { value -> writeModel(value) }
.to(settings.outputTopic) .to(settings.outputTopic + "-json-records")
objectBranches[1] objectBranches[1]
.mapValues { value -> requestPrimaryResource(value) } .mapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) } .mapValues { value -> parseModel(value) }
.mapValues { value -> addStatus(value) } .mapValues { value -> addEventType(value) }
.mapValues { value -> filterFedoraProperties(value) } .mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> writeModel(value) } .mapValues { value -> writeModel(value) }
.to(settings.outputTopic) .to(settings.outputTopic + "-json-institutions")
objectBranches[2]
.mapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> writeModel(value) }
.to(settings.outputTopic + "-json-record-sets")
return builder.build() return builder.build()
} }
private fun parseMessage(data: String): EventMessage { private fun parseMessage(data: String): EventMessage {
// can we assume that this is always correct? or should we handle parse errors?
return objectMapper.readValue(data, EventMessage::class.java) return objectMapper.readValue(data, EventMessage::class.java)
} }
...@@ -110,15 +117,15 @@ class KafkaTopology( ...@@ -110,15 +117,15 @@ class KafkaTopology(
} }
/** /**
* Adds the status of the event message to the core resources. * Adds the type of the event message to the core resources.
* Core resources are rico:Record, rico:CorporateBody (for institutions) and rico:RecordSet. * Core resources are rico:Record, rico:CorporateBody (for institutions), rico:RecordSet and rico:Instantiations.
* Downstream services should remove this property before publishing the data to the outside world! * Downstream services should remove this property before publishing the data to the outside world!
*/ */
private fun addStatus(input: Pair<EventMessage, Model>): Model { private fun addEventType(input: Pair<EventMessage, Model>): Model {
listOf(RICO.Record, RICO.CorporateBody, RICO.RecordSet).forEach { listOf(RICO.Record, RICO.CorporateBody, RICO.RecordSet, RICO.Instantiation).forEach {
val record = input.second.listSubjectsWithProperty(RDF.type, it).toList() val record = input.second.listSubjectsWithProperty(RDF.type, it).toList()
if (record.isNotEmpty()) { if (record.isNotEmpty()) {
record[0].addProperty(NS.hasStatus, input.first.eventType) record[0].addProperty(MB.eventType, input.first.eventType)
} }
} }
return input.second return input.second
......
...@@ -5,6 +5,6 @@ data class EventMessage( ...@@ -5,6 +5,6 @@ data class EventMessage(
val eventTimestamp: String, val eventTimestamp: String,
val eventType: String, val eventType: String,
val objectPath: String, val objectPath: String,
val objectId: String, val objectType: String,
val objectTypes: String val objectVersion: String? = null
) )
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment