/* * fedora-metadata-extractor * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import ch.memobase.rdf.FEDORA import ch.memobase.rdf.LDP import ch.memobase.rdf.MB import ch.memobase.rdf.RDF import ch.memobase.rdf.RICO import ch.memobase.settings.SettingsLoader import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.util.StdDateFormat import com.fasterxml.jackson.module.kotlin.registerKotlinModule import java.net.URI import java.nio.charset.StandardCharsets import java.time.Duration import java.util.Properties import kotlin.collections.HashMap import org.apache.jena.rdf.model.Model import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.Property import org.apache.jena.rdf.model.Resource import org.apache.jena.rdf.model.ResourceFactory import org.apache.jena.rdf.model.impl.SelectorImpl import org.apache.jena.riot.Lang import org.apache.jena.riot.RDFDataMgr import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.Topology import org.apache.logging.log4j.LogManager import org.fcrepo.client.FcrepoOperationFailedException import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.RdfResponseTypes import org.memobase.model.EventMessage class KafkaTopology( private val settings: SettingsLoader ) { private val log = LogManager.getLogger("FedoraMetadataExtractor") private val objectMapper = ObjectMapper().registerKotlinModule() private val fedora = FedoraClientImpl.builder() .properties(settings.appSettings, "fedora") .build() fun run(): Topology { val jsonMapper = ObjectMapper().apply { registerKotlinModule() disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) setDateFormat(StdDateFormat()) } lateinit var consumer: Consumer lateinit var producer: Producer try { consumer = createConsumer() producer = createProducer() consumer.subscribe(listOf(settings.inputTopic)) } catch (ex: Exception) { log.error("Exception during ceating kafka consumer/producer: " + ex.toString()) } while (true) { try { val records = consumer.poll(Duration.ofSeconds(1)) records.iterator().forEach { var eventJson = it.value() var event = jsonMapper.readValue(eventJson, org.memobase.model.EventMessage::class.java) var eventRdf: List> = requestPrimaryResource(event) var eventModel: Pair = parseModel(eventRdf[0]) val valueWithHeader: Triple> = extractHeaderMetadata(eventModel) val headerData: HashMap = valueWithHeader.third var model: Model = addEventType(eventModel) if (headerData["objectType"] == "rico:Record") { model = requestAdditionalRecordResources(model) } model = filterFedoraProperties(model) val modelString: String = Functions.writeModel(model) var outputTopic = settings.outputTopic + when (event.objectType) { "rico:Record" -> "-json-records" "rico:VersionedRecord" -> "-json-records" "rico:RecordSet" -> "-json-record-sets" "rico:CorporateBody" -> "-json-institutions" else -> "-XXX" } var producerRecord: ProducerRecord = ProducerRecord(outputTopic, it.key(), modelString) if (headerData.containsKey("institutionId")) { producerRecord.headers().add("institutionId", headerData["institutionId"]?.toByteArray()) } if (headerData.containsKey("recordSetId")) { producerRecord.headers().add("recordSetId", headerData["recordSetId"]?.toByteArray()) } if (headerData.containsKey("recordId")) { producerRecord.headers().add("recordId", headerData["recordId"]?.toByteArray()) } if (headerData.containsKey("isPublished")) { producerRecord.headers().add("isPublished", headerData["isPublished"]?.toByteArray()) } producer.send(producerRecord) log.debug( "FedoraEvent " + event.eventId + " written to topic '" + outputTopic + "'." ) } } catch (ex: Exception) { log.error("Exception during processing eventMessages: " + ex.toString()) } } } private fun extractHeaderMetadata(value: Pair): Triple> { val eventMsg = value.first val rdfModel = value.second var headerData: HashMap = HashMap() try { val subjectUri = eventMsg.objectPath var isPublishedProp = MB.isPublished // TODO: use MB.isPublished as soon as http VS https is unified isPublishedProp = ResourceFactory.createProperty("http://memobase.ch/internal/", "isPublished") if (eventMsg.objectType == "rico:Record") { headerData.put( "recordId", eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1) ) val recordSetId = getPropertyAsString(subjectUri, rdfModel, RICO.isPartOf) headerData.put( "recordSetId", recordSetId.substring(recordSetId.lastIndexOf('/') + 1) ) val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy) headerData.put( "institutionId", institutionId.substring(institutionId.lastIndexOf('/') + 1) ) val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp) headerData.put("isPublished", isPublished) } else if (eventMsg.objectType == "rico:RecordSet") { headerData.put( "recordsetId", eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1) ) val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy) headerData.put( "institutionId", institutionId.substring(institutionId.lastIndexOf('/') + 1) ) val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp) headerData.put("isPublished", isPublished) } else if (eventMsg.objectType == "rico:Institution") { headerData.put( "institutionId", eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1) ) val recordSetIds = getPropertyAsString(subjectUri, rdfModel, RICO.isHolderOf) headerData.put( "recordsetIds", recordSetIds.substring(eventMsg.objectPath.lastIndexOf('/') + 1) ) val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp) headerData.put("isPublished", isPublished) } } catch (ex: Exception) { log.error("Exception while extracting HeaderData for " + eventMsg.eventId + ": " + ex.message) } return Triple(eventMsg, rdfModel, headerData) } private fun getResourceFromRdf(model: Model, uri: String): Resource { return model.getResource(uri) } private fun getPropertyAsString(subjectUri: String, model: Model, property: Property): String { val subject = model.getResource(subjectUri) val objects = subject.listProperties(property).mapWith { val o = it.`object` when { o.isURIResource -> o.asResource().uri o.isLiteral -> o.asLiteral().value.toString() else -> o.toString() } }.toList() return when { objects.size <= 0 -> throw Exception("No property $property found.") objects.size > 1 -> throw Exception("More than 1 property $property found") else -> objects[0] } } private fun parseMessage(data: String): List { return try { listOf(objectMapper.readValue(data, EventMessage::class.java)) } catch (ex: JsonParseException) { log.error("Invalid json found: $data") emptyList() } catch (ex: JsonMappingException) { log.error("Could not parse the json object as an event message: $data") emptyList() } } private fun requestPrimaryResource(message: EventMessage): List> { return try { listOf( Pair( message, fedora.fetchRdfResourceIntoString(URI.create(message.objectPath), RdfResponseTypes.NTRIPLES) ) ) } catch (ex: FcrepoOperationFailedException) { when (ex.statusCode) { 410 -> log.error("Status: ${ex.statusCode} -> The resource at path ${message.objectPath} has been deleted.") 404 -> log.error("Status: ${ex.statusCode} -> No resource found at path ${message.objectPath}.") else -> log.error("Status: ${ex.statusCode} -> Unknown Error: ${ex.localizedMessage}.") } emptyList() } } private fun parseModel(input: Pair): Pair { val model = ModelFactory.createDefaultModel() RDFDataMgr.read(model, input.second.byteInputStream(StandardCharsets.UTF_8), Lang.NT) return Pair(input.first, model) } /** * Adds the type of the event message to the core resources. * Core resources are rico:Record, rico:CorporateBody (for institutions), rico:RecordSet and rico:Instantiations. * Downstream services should remove this property before publishing the data to the outside world! */ private fun addEventType(input: Pair): Model { // TODO: Add filter for corporate body so that only memobase institutions are flagged. listOf(RICO.Record, RICO.CorporateBody, RICO.RecordSet, RICO.Instantiation).forEach { val record = input.second.listSubjectsWithProperty(RDF.type, it).toList() if (record.isNotEmpty()) { record[0].addProperty(MB.eventType, input.first.eventType) } } return input.second } private fun requestAdditionalRecordResources(model: Model): Model { val subjects = model.listSubjectsWithProperty(RDF.type, RICO.Record).toList() if (subjects.isEmpty()) { // TODO: Improve exception handling throw Exception("The requested resource is not a record...") } else { val subject = subjects[0]!! requestAdditionalResources(subject, model, RICO.heldBy) requestAdditionalResources(subject, model, RICO.isPartOf) requestAdditionalResources(subject, model, RICO.hasInstantiation) } return model } private fun requestAdditionalResources(subject: Resource, model: Model, property: Property) { if (subject.hasProperty(property)) { subject.listProperties(property).forEach { if (it.`object`.isResource) { val uri = it.`object`.asResource().uri val data = fedora.fetchRdfResourceIntoString(URI(uri), RdfResponseTypes.NTRIPLES) RDFDataMgr.read(model, data.byteInputStream(StandardCharsets.UTF_8), Lang.NT) } } } } private fun filterFedoraProperties(model: Model): Model { // This value is required for the compiler to be able to use the SelectorImpl constructor val typedValue: String? = null listOf( FEDORA.created, FEDORA.createdBy, FEDORA.lastModified, FEDORA.lastModifiedBy ).forEach { val createdStatements = model.listStatements( SelectorImpl(null, it, typedValue) ) model.remove(createdStatements) } listOf( LDP.Container, LDP.BasicContainer, LDP.RDFSource, FEDORA.Container, FEDORA.Resource ).forEach { val createdStatements = model.listStatements( SelectorImpl(null, RDF.type, it) ) model.remove(createdStatements) } return model } private fun createConsumer(): Consumer { val props = Properties() props["bootstrap.servers"] = listOf(settings.kafkaConsumerSettings)[0]["bootstrap.servers"].toString() props["group.id"] = listOf(settings.kafkaConsumerSettings)[0]["group.id"].toString() props["client.id"] = listOf(settings.kafkaConsumerSettings)[0]["client.id"].toString() props["key.deserializer"] = StringDeserializer::class.java props["value.deserializer"] = StringDeserializer::class.java props["enable.auto.commit"] = false props["max.poll.records"] = "10" props["max.poll.interval.ms"] = "3600000" props["max.poll.interval.ms"] = "3600000" props["auto.offset.reset"] = "earliest" props["allow.auto.create.topics"] = false lateinit var consumer: org.apache.kafka.clients.consumer.KafkaConsumer try { consumer = KafkaConsumer(props) } catch (t: Exception) { consumer = KafkaConsumer(props) } return consumer } private fun createProducer(): Producer { val props = Properties() props["bootstrap.servers"] = listOf(settings.kafkaProducerSettings)[0]["bootstrap.servers"].toString() props["key.serializer"] = StringSerializer::class.java props["value.serializer"] = StringSerializer::class.java return KafkaProducer(props) } }