/*
* fedora-metadata-extractor
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import ch.memobase.rdf.FEDORA
import ch.memobase.rdf.LDP
import ch.memobase.rdf.MB
import ch.memobase.rdf.RDF
import ch.memobase.rdf.RICO
import ch.memobase.settings.SettingsLoader
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.util.StdDateFormat
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.net.URI
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.Properties
import kotlin.collections.HashMap
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
import org.apache.jena.rdf.model.impl.SelectorImpl
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClientImpl
import org.memobase.fedora.RdfResponseTypes
import org.memobase.model.EventMessage
class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("FedoraMetadataExtractor")
private val objectMapper = ObjectMapper().registerKotlinModule()
private val fedora = FedoraClientImpl.builder()
.properties(settings.appSettings, "fedora")
.build()
fun run(): Topology {
val jsonMapper = ObjectMapper().apply {
registerKotlinModule()
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
setDateFormat(StdDateFormat())
}
lateinit var consumer: Consumer
lateinit var producer: Producer
try {
consumer = createConsumer()
producer = createProducer()
consumer.subscribe(listOf(settings.inputTopic))
} catch (ex: Exception) {
log.error("Exception during ceating kafka consumer/producer: " + ex.toString())
}
while (true) {
try {
val records = consumer.poll(Duration.ofSeconds(1))
records.iterator().forEach {
var eventJson = it.value()
var event = jsonMapper.readValue(eventJson, org.memobase.model.EventMessage::class.java)
var eventRdf: List> = requestPrimaryResource(event)
var eventModel: Pair = parseModel(eventRdf[0])
val valueWithHeader: Triple> =
extractHeaderMetadata(eventModel)
val headerData: HashMap = valueWithHeader.third
var model: Model = addEventType(eventModel)
if (headerData["objectType"] == "rico:Record") {
model = requestAdditionalRecordResources(model)
}
model = filterFedoraProperties(model)
val modelString: String = Functions.writeModel(model)
var outputTopic = settings.outputTopic + when (event.objectType) {
"rico:Record" -> "-json-records"
"rico:VersionedRecord" -> "-json-records"
"rico:RecordSet" -> "-json-record-sets"
"rico:CorporateBody" -> "-json-institutions"
else -> "-XXX"
}
var producerRecord: ProducerRecord =
ProducerRecord(outputTopic, it.key(), modelString)
if (headerData.containsKey("institutionId")) {
producerRecord.headers().add("institutionId", headerData["institutionId"]?.toByteArray())
}
if (headerData.containsKey("recordSetId")) {
producerRecord.headers().add("recordSetId", headerData["recordSetId"]?.toByteArray())
}
if (headerData.containsKey("recordId")) {
producerRecord.headers().add("recordId", headerData["recordId"]?.toByteArray())
}
if (headerData.containsKey("isPublished")) {
producerRecord.headers().add("isPublished", headerData["isPublished"]?.toByteArray())
}
producer.send(producerRecord)
log.debug(
"FedoraEvent " + event.eventId +
" written to topic '" + outputTopic + "'."
)
}
} catch (ex: Exception) {
log.error("Exception during processing eventMessages: " + ex.toString())
}
}
}
private fun extractHeaderMetadata(value: Pair): Triple> {
val eventMsg = value.first
val rdfModel = value.second
var headerData: HashMap = HashMap()
try {
val subjectUri = eventMsg.objectPath
var isPublishedProp = MB.isPublished // TODO: use MB.isPublished as soon as http VS https is unified
isPublishedProp = ResourceFactory.createProperty("http://memobase.ch/internal/", "isPublished")
if (eventMsg.objectType == "rico:Record") {
headerData.put(
"recordId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val recordSetId = getPropertyAsString(subjectUri, rdfModel, RICO.isPartOf)
headerData.put(
"recordSetId",
recordSetId.substring(recordSetId.lastIndexOf('/') + 1)
)
val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy)
headerData.put(
"institutionId",
institutionId.substring(institutionId.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
} else if (eventMsg.objectType == "rico:RecordSet") {
headerData.put(
"recordsetId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy)
headerData.put(
"institutionId",
institutionId.substring(institutionId.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
} else if (eventMsg.objectType == "rico:Institution") {
headerData.put(
"institutionId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val recordSetIds = getPropertyAsString(subjectUri, rdfModel, RICO.isHolderOf)
headerData.put(
"recordsetIds",
recordSetIds.substring(eventMsg.objectPath.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
}
} catch (ex: Exception) {
log.error("Exception while extracting HeaderData for " + eventMsg.eventId + ": " + ex.message)
}
return Triple(eventMsg, rdfModel, headerData)
}
private fun getResourceFromRdf(model: Model, uri: String): Resource {
return model.getResource(uri)
}
private fun getPropertyAsString(subjectUri: String, model: Model, property: Property): String {
val subject = model.getResource(subjectUri)
val objects = subject.listProperties(property).mapWith {
val o = it.`object`
when {
o.isURIResource -> o.asResource().uri
o.isLiteral -> o.asLiteral().value.toString()
else -> o.toString()
}
}.toList()
return when {
objects.size <= 0 -> throw Exception("No property $property found.")
objects.size > 1 -> throw Exception("More than 1 property $property found")
else -> objects[0]
}
}
private fun parseMessage(data: String): List {
return try {
listOf(objectMapper.readValue(data, EventMessage::class.java))
} catch (ex: JsonParseException) {
log.error("Invalid json found: $data")
emptyList()
} catch (ex: JsonMappingException) {
log.error("Could not parse the json object as an event message: $data")
emptyList()
}
}
private fun requestPrimaryResource(message: EventMessage): List> {
return try {
listOf(
Pair(
message,
fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), RdfResponseTypes.NTRIPLES)
)
)
} catch (ex: FcrepoOperationFailedException) {
when (ex.statusCode) {
410 -> log.error("Status: ${ex.statusCode} -> The resource at path ${message.objectPath} has been deleted.")
404 -> log.error("Status: ${ex.statusCode} -> No resource found at path ${message.objectPath}.")
else -> log.error("Status: ${ex.statusCode} -> Unknown Error: ${ex.localizedMessage}.")
}
emptyList()
}
}
private fun parseModel(input: Pair): Pair {
val model = ModelFactory.createDefaultModel()
RDFDataMgr.read(model, input.second.byteInputStream(StandardCharsets.UTF_8), Lang.NT)
return Pair(input.first, model)
}
/**
* Adds the type of the event message to the core resources.
* Core resources are rico:Record, rico:CorporateBody (for institutions), rico:RecordSet and rico:Instantiations.
* Downstream services should remove this property before publishing the data to the outside world!
*/
private fun addEventType(input: Pair): Model {
// TODO: Add filter for corporate body so that only memobase institutions are flagged.
listOf(RICO.Record, RICO.CorporateBody, RICO.RecordSet, RICO.Instantiation).forEach {
val record = input.second.listSubjectsWithProperty(RDF.type, it).toList()
if (record.isNotEmpty()) {
record[0].addProperty(MB.eventType, input.first.eventType)
}
}
return input.second
}
private fun requestAdditionalRecordResources(model: Model): Model {
val subjects = model.listSubjectsWithProperty(RDF.type, RICO.Record).toList()
if (subjects.isEmpty()) {
// TODO: Improve exception handling
throw Exception("The requested resource is not a record...")
} else {
val subject = subjects[0]!!
requestAdditionalResources(subject, model, RICO.heldBy)
requestAdditionalResources(subject, model, RICO.isPartOf)
requestAdditionalResources(subject, model, RICO.hasInstantiation)
}
return model
}
private fun requestAdditionalResources(subject: Resource, model: Model, property: Property) {
if (subject.hasProperty(property)) {
subject.listProperties(property).forEach {
if (it.`object`.isResource) {
val uri = it.`object`.asResource().uri
val data = fedora.fetchRdfResourceIntoString(URI(uri), RdfResponseTypes.NTRIPLES)
RDFDataMgr.read(model, data.byteInputStream(StandardCharsets.UTF_8), Lang.NT)
}
}
}
}
private fun filterFedoraProperties(model: Model): Model {
// This value is required for the compiler to be able to use the SelectorImpl constructor
val typedValue: String? = null
listOf(
FEDORA.created,
FEDORA.createdBy,
FEDORA.lastModified,
FEDORA.lastModifiedBy
).forEach {
val createdStatements = model.listStatements(
SelectorImpl(null, it, typedValue)
)
model.remove(createdStatements)
}
listOf(
LDP.Container,
LDP.BasicContainer,
LDP.RDFSource,
FEDORA.Container,
FEDORA.Resource
).forEach {
val createdStatements = model.listStatements(
SelectorImpl(null, RDF.type, it)
)
model.remove(createdStatements)
}
return model
}
private fun createConsumer(): Consumer {
val props = Properties()
props["bootstrap.servers"] = listOf(settings.kafkaConsumerSettings)[0]["bootstrap.servers"].toString()
props["group.id"] = listOf(settings.kafkaConsumerSettings)[0]["group.id"].toString()
props["client.id"] = listOf(settings.kafkaConsumerSettings)[0]["client.id"].toString()
props["key.deserializer"] = StringDeserializer::class.java
props["value.deserializer"] = StringDeserializer::class.java
props["enable.auto.commit"] = false
props["max.poll.records"] = "10"
props["max.poll.interval.ms"] = "3600000"
props["max.poll.interval.ms"] = "3600000"
props["auto.offset.reset"] = "earliest"
props["allow.auto.create.topics"] = false
lateinit var consumer: org.apache.kafka.clients.consumer.KafkaConsumer
try {
consumer = KafkaConsumer(props)
} catch (t: Exception) {
consumer = KafkaConsumer(props)
}
return consumer
}
private fun createProducer(): Producer {
val props = Properties()
props["bootstrap.servers"] = listOf(settings.kafkaProducerSettings)[0]["bootstrap.servers"].toString()
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = StringSerializer::class.java
return KafkaProducer(props)
}
}