Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit d645b8d1 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fix no instantiation issue

parent 9f55e533
......@@ -32,7 +32,6 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
......@@ -67,59 +66,59 @@ class KafkaTopology(private val settings: SettingsLoader) {
val stream = builder.stream<String, String>(settings.inputTopic)
val instantiationBranch = stream
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.branch(
Predicate { _, value -> isDigitalObjectWithoutLocator(value.second) },
Predicate { _, _ -> true }
)
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.branch(
Predicate { _, value -> isDigitalObjectWithoutLocator(value.second) },
Predicate { _, _ -> true }
)
val updateDigitalObjects = instantiationBranch[0]
.mapValues { value -> enrichSftpLocator(value) }
.mapValues { value ->
val out = StringWriter()
value.first.write(out, "NTRIPLES")
out.toString().trim()
}
.mapValues { value -> enrichSftpLocator(value) }
.mapValues { value ->
val out = StringWriter()
value.first.write(out, "NTRIPLES")
out.toString().trim()
}
updateDigitalObjects
.to(settings.outputTopic)
.to(settings.outputTopic)
updateDigitalObjects
.mapValues { readOnlyKey, _ ->
Klaxon().toJsonString(
Report(
readOnlyKey,
ReportStatus.success,
"Added sftp locator link to digital resource."
.mapValues { readOnlyKey, _ ->
Klaxon().toJsonString(
Report(
readOnlyKey,
ReportStatus.success,
"Added sftp locator link to digital resource."
)
)
)
}
.to(reportingTopic)
}
.to(reportingTopic)
val unchangedWrittenResources = instantiationBranch[1]
.mapValues { value ->
val out = StringWriter()
value.first.write(out, "NTRIPLES")
out.toString().trim()
}
.mapValues { value ->
val out = StringWriter()
value.first.write(out, "NTRIPLES")
out.toString().trim()
}
unchangedWrittenResources
.to(settings.outputTopic)
.to(settings.outputTopic)
unchangedWrittenResources
.mapValues { key, _ -> Klaxon().toJsonString(Report(key, ReportStatus.success, "No changes made.")) }
.to(reportingTopic)
.mapValues { key, _ -> Klaxon().toJsonString(Report(key, ReportStatus.success, "No changes made.")) }
.to(reportingTopic)
return builder.build()
}
private fun extractSubjects(model: Model): Pair<Model, Resource> {
return Pair(model, model.listSubjectsWithProperty(RDF.type, RICO.Instantiation).next())
private fun extractSubjects(model: Model): Pair<Model, List<Resource>> {
return Pair(model, model.listSubjects().toList())
}
private fun isDigitalObjectWithoutLocator(res: Resource): Boolean {
return res.hasProperty(RICO.type, "digitalObject") && !res.hasProperty(EBUCORE.locator)
private fun isDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
return res.any { it.hasProperty(RICO.type, "digitalObject") && !it.hasProperty(EBUCORE.locator) }
}
private fun createModel(data: String): Model {
......@@ -128,10 +127,11 @@ class KafkaTopology(private val settings: SettingsLoader) {
return model
}
private fun enrichSftpLocator(data: Pair<Model, Resource>): Pair<Model, Report> {
private fun enrichSftpLocator(data: Pair<Model, List<Resource>>): Pair<Model, Report> {
var link = ""
val digitalObject = data.second.first { it.hasProperty(RICO.type, "digitalObject") && !it.hasProperty(EBUCORE.locator) }
val addedStatements = mutableListOf<Statement>()
for (stmt in data.second.listProperties(RICO.identifiedBy)) {
for (stmt in digitalObject.listProperties(RICO.identifiedBy)) {
if (stmt != null) {
if (stmt.`object`.isResource) {
val resource = stmt.`object`.asResource()
......@@ -141,7 +141,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
if (file.contains(value.string)) {
link = "sftp:$file"
val literal = data.first.createLiteral(link)
addedStatements.add(StatementImpl(data.second, EBUCORE.locator, literal))
addedStatements.add(StatementImpl(digitalObject, EBUCORE.locator, literal))
}
}
}
......@@ -151,21 +151,21 @@ class KafkaTopology(private val settings: SettingsLoader) {
if (addedStatements.size > 0) {
data.first.add(addedStatements)
return Pair(
data.first,
Report(
data.second.uri,
ReportStatus.success,
ReportMessages.reportSuccess(data.second.uri, link)
)
data.first,
Report(
digitalObject.uri,
ReportStatus.success,
ReportMessages.reportSuccess(digitalObject.uri, link)
)
)
} else {
return Pair(
data.first,
Report(
data.second.id.labelString,
ReportStatus.failure,
ReportMessages.reportFailure(data.second.id.labelString)
)
data.first,
Report(
digitalObject.id.labelString,
ReportStatus.failure,
ReportMessages.reportFailure(digitalObject.id.labelString)
)
)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment