Commit 5cde0cba authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fix test

Add fedora triples filter
parent 75cd49ee
...@@ -38,6 +38,7 @@ dependencies { ...@@ -38,6 +38,7 @@ dependencies {
// Logging Framework // Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}" implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}" implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
// Kafka Imports // Kafka Imports
implementation "org.apache.kafka:kafka-streams:${kafkaV}" implementation "org.apache.kafka:kafka-streams:${kafkaV}"
......
...@@ -24,6 +24,8 @@ import org.apache.jena.rdf.model.Model ...@@ -24,6 +24,8 @@ import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.impl.SelectorImpl
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.jena.riot.Lang import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
...@@ -32,9 +34,7 @@ import org.apache.kafka.streams.kstream.Predicate ...@@ -32,9 +34,7 @@ import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.FedoraClientImpl
import org.memobase.model.EventMessage import org.memobase.model.EventMessage
import org.memobase.rdf.NS import org.memobase.rdf.*
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import java.io.StringWriter import java.io.StringWriter
import java.net.URI import java.net.URI
...@@ -57,20 +57,19 @@ class KafkaTopology( ...@@ -57,20 +57,19 @@ class KafkaTopology(
) )
.build() .build()
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
val objectBranches = stream val objectBranches = stream
.mapValues { value -> parseMessage(value) } .mapValues { value -> parseMessage(value) }
.branch( .branch(
// TODO: Add actual values. // TODO: Add actual values.
Predicate { _, value -> value.objectTypes == "rico:Record" }, Predicate { _, value -> value.objectTypes == "rico:Record" },
Predicate { _, value -> Predicate { _, value ->
value.objectTypes == "rico:CorporateBody" || value.objectTypes == "rico:RecordSet" } value.objectTypes == "rico:CorporateBody" || value.objectTypes == "rico:RecordSet"
}
) )
objectBranches[0] objectBranches[0]
...@@ -78,6 +77,7 @@ class KafkaTopology( ...@@ -78,6 +77,7 @@ class KafkaTopology(
.mapValues { value -> parseModel(value) } .mapValues { value -> parseModel(value) }
.mapValues { value -> addStatus(value) } .mapValues { value -> addStatus(value) }
.mapValues { value -> requestAdditionalRecordResources(value) } .mapValues { value -> requestAdditionalRecordResources(value) }
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> writeModel(value) } .mapValues { value -> writeModel(value) }
.to(settings.outputTopic) .to(settings.outputTopic)
...@@ -85,20 +85,20 @@ class KafkaTopology( ...@@ -85,20 +85,20 @@ class KafkaTopology(
.mapValues { value -> requestPrimaryResource(value) } .mapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) } .mapValues { value -> parseModel(value) }
.mapValues { value -> addStatus(value) } .mapValues { value -> addStatus(value) }
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> writeModel(value) } .mapValues { value -> writeModel(value) }
.to(settings.outputTopic) .to(settings.outputTopic)
return builder.build() return builder.build()
} }
private fun parseMessage(data: String): EventMessage { private fun parseMessage(data: String): EventMessage {
// can we assume that this is always correct? or should we handle parse errors? // can we assume that this is always correct? or should we handle parse errors?
return objectMapper.readValue(data, EventMessage::class.java) return objectMapper.readValue(data, EventMessage::class.java)
} }
private fun requestPrimaryResource(message: EventMessage): Pair<EventMessage, String> { private fun requestPrimaryResource(message: EventMessage): Pair<EventMessage, String> {
return Pair(message, fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), "plain/text")) return Pair(message, fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), "text/plain"))
} }
private fun parseModel(input: Pair<EventMessage, String>): Pair<EventMessage, Model> { private fun parseModel(input: Pair<EventMessage, String>): Pair<EventMessage, Model> {
...@@ -142,16 +142,47 @@ class KafkaTopology( ...@@ -142,16 +142,47 @@ class KafkaTopology(
subject.listProperties(property).forEach { subject.listProperties(property).forEach {
if (it.`object`.isResource) { if (it.`object`.isResource) {
val uri = it.`object`.asResource().uri val uri = it.`object`.asResource().uri
val data = fedora.fetchRdfResourceIntoString(URI(uri), "plain/text") val data = fedora.fetchRdfResourceIntoString(URI(uri), "text/plain")
RDFDataMgr.read(model, data.byteInputStream(StandardCharsets.UTF_8), Lang.NT) RDFDataMgr.read(model, data.byteInputStream(StandardCharsets.UTF_8), Lang.NT)
} }
} }
} }
} }
private fun filterFedoraProperties(model: Model): Model {
// This value is required for the compiler to be able to use the SelectorImpl constructor
val typedValue: String? = null
listOf(
FEDORA.created,
FEDORA.createdBy,
FEDORA.lastModified,
FEDORA.lastModifiedBy
).forEach {
val createdStatements = model.listStatements(
SelectorImpl(null, it, typedValue)
)
model.remove(createdStatements)
}
listOf(
LDP.Container,
LDP.BasicContainer,
LDP.RDFSource,
FEDORA.Container,
FEDORA.Resource
).forEach {
val createdStatements = model.listStatements(
SelectorImpl(null, RDF.type, it)
)
model.remove(createdStatements)
}
return model
}
private fun writeModel(model: Model): String { private fun writeModel(model: Model): String {
val writer = StringWriter() val writer = StringWriter()
RDFDataMgr.write(writer, model, Lang.JSONLD) RDFDataMgr.write(writer, model, Lang.NT)
return writer.toString() return writer.toString().trim()
} }
} }
package org.memobase.rdf
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
object FEDORA {
val Container = res("Container")
val Resource = res("Resource")
val lastModifiedBy = prop("lastModifiedBy")
val createdBy = prop("createdBy")
val created = prop("created")
val lastModified = prop("lastModified")
private fun prop(name: String): Property {
return ResourceFactory.createProperty(NS.fedora, name)
}
private fun res(name: String): Resource {
return ResourceFactory.createResource(NS.fedora + name)
}
}
\ No newline at end of file
package org.memobase.rdf
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
object LDP {
val RDFSource = res("RDFSource")
val Container = res("Container")
val BasicContainer = res("BasicContainer")
private fun prop(name: String): Property {
return ResourceFactory.createProperty(NS.ldp, name)
}
private fun res(name: String): Resource {
return ResourceFactory.createResource(NS.ldp + name)
}
}
\ No newline at end of file
...@@ -47,6 +47,9 @@ object NS { ...@@ -47,6 +47,9 @@ object NS {
val rdau = "http://rdaregistry.info/Elements/u/" val rdau = "http://rdaregistry.info/Elements/u/"
const val ldp = "http://www.w3.org/ns/ldp#"
const val fedora = "http://fedora.info/definitions/v4/repository#"
private const val internal = "http://memobase.ch/internal/" private const val internal = "http://memobase.ch/internal/"
val hasStatus = prop("hasStatus") val hasStatus = prop("hasStatus")
......
...@@ -47,7 +47,7 @@ class Tests { ...@@ -47,7 +47,7 @@ class Tests {
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings) val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(StringSerializer(), StringSerializer()) val factory = ConsumerRecordFactory(StringSerializer(), StringSerializer())
val input = readFile("${params.count}/input.json") val input = readFile("${params.count}/input.json")
val output = readFile("${params.count}/output.json") val output = readFile("${params.count}/output.nt")
testDriver.pipeInput( testDriver.pipeInput(
factory.create(service.settings.inputTopic, params.key, input) factory.create(service.settings.inputTopic, params.key, input)
) )
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
"eventId": "1", "eventId": "1",
"eventTimestamp": "10:10:10", "eventTimestamp": "10:10:10",
"eventType": "CREATE", "eventType": "CREATE",
"objectPath": "memobase.ch/record/test-id", "objectPath": "https://memobase.ch/record/BAZ-MEI_77466",
"objectId": "test-id", "objectId": "test-id",
"objectTypes": "rico:Record" "objectTypes": "rico:Record"
} }
\ No newline at end of file
<https://memobase.ch/record/BAZ-MEI_77466#genid0524f985-c1d2-4a86-a239-6cdadca53632> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#CreationRelation> .
<https://memobase.ch/record/BAZ-MEI_77466#genid0524f985-c1d2-4a86-a239-6cdadca53632> <https://www.ica.org/standards/RiC/ontology#creationRelationHasSource> <https://memobase.ch/record/BAZ-MEI_77466> .
<https://memobase.ch/record/BAZ-MEI_77466#genid0524f985-c1d2-4a86-a239-6cdadca53632> <https://www.ica.org/standards/RiC/ontology#creationRelationHasTarget> <https://memobase.ch/record/BAZ-MEI_77466#genida8ce0c15-3b0f-46c2-b2a3-64e605bc03d2> .
<https://memobase.ch/record/BAZ-MEI_77466#genid0524f985-c1d2-4a86-a239-6cdadca53632> <https://www.ica.org/standards/RiC/ontology#type> "Auftraggeber" .
<https://memobase.ch/record/BAZ-MEI_77466#genid5e41f3a9-4392-49b0-b10e-464867bf9d45> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Place> .
<https://memobase.ch/record/BAZ-MEI_77466#genid5e41f3a9-4392-49b0-b10e-464867bf9d45> <https://www.ica.org/standards/RiC/ontology#name> "Luzern"@de .
<https://memobase.ch/record/BAZ-MEI_77466#genid98f1658d-4934-4c6b-a991-c81e73affa5e> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Rule> .
<https://memobase.ch/record/BAZ-MEI_77466#genid98f1658d-4934-4c6b-a991-c81e73affa5e> <https://www.ica.org/standards/RiC/ontology#name> "BAZ" .
<https://memobase.ch/record/BAZ-MEI_77466#genid98f1658d-4934-4c6b-a991-c81e73affa5e> <https://www.ica.org/standards/RiC/ontology#regulates> <https://memobase.ch/record/BAZ-MEI_77466> .
<https://memobase.ch/record/BAZ-MEI_77466#genid98f1658d-4934-4c6b-a991-c81e73affa5e> <https://www.ica.org/standards/RiC/ontology#type> "holder" .
<https://memobase.ch/record/BAZ-MEI_77466#genida0b30045-473f-4fb4-8ed0-c41543011eb1> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Identifier> .
<https://memobase.ch/record/BAZ-MEI_77466#genida0b30045-473f-4fb4-8ed0-c41543011eb1> <https://www.ica.org/standards/RiC/ontology#identifier> "MEI_77466" .
<https://memobase.ch/record/BAZ-MEI_77466#genida0b30045-473f-4fb4-8ed0-c41543011eb1> <https://www.ica.org/standards/RiC/ontology#type> "callNumber" .
<https://memobase.ch/record/BAZ-MEI_77466#genida8ce0c15-3b0f-46c2-b2a3-64e605bc03d2> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Agent> .
<https://memobase.ch/record/BAZ-MEI_77466#genida8ce0c15-3b0f-46c2-b2a3-64e605bc03d2> <https://www.ica.org/standards/RiC/ontology#name> "Meyer, Gisbert"@de .
<https://memobase.ch/record/BAZ-MEI_77466#genidbcefc1dd-8e99-4026-8ded-38b5f0814fbd> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Identifier> .
<https://memobase.ch/record/BAZ-MEI_77466#genidbcefc1dd-8e99-4026-8ded-38b5f0814fbd> <https://www.ica.org/standards/RiC/ontology#identifier> "http://127.0.0.1:8080/fcrepo/rest/record/BAZ-MEI_77466" .
<https://memobase.ch/record/BAZ-MEI_77466#genidbcefc1dd-8e99-4026-8ded-38b5f0814fbd> <https://www.ica.org/standards/RiC/ontology#type> "main" .
<https://memobase.ch/record/BAZ-MEI_77466#genidc54a285d-a55d-43b7-9b4e-a5bb59ce0387> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#DateSet> .
<https://memobase.ch/record/BAZ-MEI_77466#genidc54a285d-a55d-43b7-9b4e-a5bb59ce0387> <https://www.ica.org/standards/RiC/ontology#expressedDate> "19420128" .
<https://memobase.ch/record/BAZ-MEI_77466#genidca164cd4-d561-4190-a63b-41fe3614de2c> <http://memobase.ch/internal/hasStatus> "CREATE" .
<https://memobase.ch/record/BAZ-MEI_77466#genidca164cd4-d561-4190-a63b-41fe3614de2c> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#CorporateBody> .
<https://memobase.ch/record/BAZ-MEI_77466#genidca164cd4-d561-4190-a63b-41fe3614de2c> <https://www.ica.org/standards/RiC/ontology#name> "Atelier Meiner"@de .
<https://memobase.ch/record/BAZ-MEI_77466#genidd775a4d4-bc4c-49f0-8da7-1cb08ef98edb> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Title> .
<https://memobase.ch/record/BAZ-MEI_77466#genidd775a4d4-bc4c-49f0-8da7-1cb08ef98edb> <https://www.ica.org/standards/RiC/ontology#title> "Eingangsbereich Landhaus bei Windisch, erbaut von Gisbert Meyer, Architekt"@de .
<https://memobase.ch/record/BAZ-MEI_77466#genidd775a4d4-bc4c-49f0-8da7-1cb08ef98edb> <https://www.ica.org/standards/RiC/ontology#type> "main" .
<https://memobase.ch/record/BAZ-MEI_77466#genide6bd0c30-d198-4437-97e6-ce278946e276> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#CreationRelation> .
<https://memobase.ch/record/BAZ-MEI_77466#genide6bd0c30-d198-4437-97e6-ce278946e276> <https://www.ica.org/standards/RiC/ontology#creationRelationHasSource> <https://memobase.ch/record/BAZ-MEI_77466> .
<https://memobase.ch/record/BAZ-MEI_77466#genide6bd0c30-d198-4437-97e6-ce278946e276> <https://www.ica.org/standards/RiC/ontology#creationRelationHasTarget> <https://memobase.ch/record/BAZ-MEI_77466#genidca164cd4-d561-4190-a63b-41fe3614de2c> .
<https://memobase.ch/record/BAZ-MEI_77466#genide6bd0c30-d198-4437-97e6-ce278946e276> <https://www.ica.org/standards/RiC/ontology#type> "Fotograf" .
<https://memobase.ch/record/BAZ-MEI_77466> <http://memobase.ch/internal/hasStatus> "CREATE" .
<https://memobase.ch/record/BAZ-MEI_77466> <http://purl.org/dc/terms/created> <https://memobase.ch/record/BAZ-MEI_77466#genidc54a285d-a55d-43b7-9b4e-a5bb59ce0387> .
<https://memobase.ch/record/BAZ-MEI_77466> <http://purl.org/dc/terms/relation> "Bezug Findmittel: Auftragsregister Bd. 9; Bildverzeichnis Bd. 13" .
<https://memobase.ch/record/BAZ-MEI_77466> <http://rdaregistry.info/Elements/u/P60451> "Memoriav" .
<https://memobase.ch/record/BAZ-MEI_77466> <http://rdaregistry.info/Elements/u/P60556> <https://memobase.ch/record/BAZ-MEI_77466#genid5e41f3a9-4392-49b0-b10e-464867bf9d45> .
<https://memobase.ch/record/BAZ-MEI_77466> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Record> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#descriptiveNote> "Garderobe im Eingangsbereich mit grossem Spiegel."@de .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#hasTitle> <https://memobase.ch/record/BAZ-MEI_77466#genidd775a4d4-bc4c-49f0-8da7-1cb08ef98edb> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#heldBy> <https://memobase.ch/institution/BAZ> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#identifiedBy> <https://memobase.ch/record/BAZ-MEI_77466#genida0b30045-473f-4fb4-8ed0-c41543011eb1> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#identifiedBy> <https://memobase.ch/record/BAZ-MEI_77466#genidbcefc1dd-8e99-4026-8ded-38b5f0814fbd> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#isPartOf> <https://memobase.ch/recordSet/BAZ-B_MEI> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#recordResourceOrInstantiationIsSourceOfCreationRelation> <https://memobase.ch/record/BAZ-MEI_77466#genid0524f985-c1d2-4a86-a239-6cdadca53632> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#recordResourceOrInstantiationIsSourceOfCreationRelation> <https://memobase.ch/record/BAZ-MEI_77466#genide6bd0c30-d198-4437-97e6-ce278946e276> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#regulatedBy> <https://memobase.ch/record/BAZ-MEI_77466#genid98f1658d-4934-4c6b-a991-c81e73affa5e> .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#title> "Eingangsbereich Landhaus bei Windisch, erbaut von Gisbert Meyer, Architekt"@de .
<https://memobase.ch/record/BAZ-MEI_77466> <https://www.ica.org/standards/RiC/ontology#type> "Foto" .
\ No newline at end of file
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
</Console> </Console>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Root level="info"> <Root level="warn">
<AppenderRef ref="STDOUT"/> <AppenderRef ref="STDOUT"/>
</Root> </Root>
</Loggers> </Loggers>
......
app: app:
fedora: fedora:
user: admin user: fedoraAdmin
password: password password: ${FEDORA_PASSWORD:?system}
internalDomain: https://fedora.com internalDomain: http://localhost:8080/fcrepo/rest/
externalDomain: https://memobase.ch externalDomain: https://memobase.ch/
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment