Commit df7388f6 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor mapper service

parent 6628a520
......@@ -32,8 +32,9 @@ ext {
}
dependencies {
implementation 'org.memobase:memobase-service-utilities:0.14.2'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:0.16.0'
implementation 'ch.memobase:mapper-service-configuration:0.3.3'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
......@@ -47,8 +48,6 @@ dependencies {
implementation 'org.apache.jena:apache-jena:3.14.0'
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// Compression
//implementation "org.apache.commons:commons-compress:1.19"
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
......
......@@ -18,55 +18,58 @@
package org.memobase
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.mapping.MappingConfigurationParser
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import mapping.MapperConfiguration
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.builder.ResourceBuilder
import org.memobase.helpers.ReportStatus
import org.memobase.mapping.MappingConfig
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
import java.io.ByteArrayInputStream
class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("KafkaTopology")
private val config = MappingConfig(settings.appSettings.getProperty("configs"))
private val reportTopic = settings.outputTopic + "-reporting"
private val institutionId = settings.appSettings.getProperty("institutionId")
private val recordSetId = settings.appSettings.getProperty("recordSetId")
private val isPublished = settings.appSettings.getProperty("isPublished")!!.toBoolean()
private val reportTopic = settings.processReportTopic
private val klaxon = Klaxon()
private val configJoiner = ConfigJoiner<String, MapperConfiguration>(
ImportService.Mapping,
Serdes.String(),
Serdes.serdeFrom(
{ _, data -> klaxon.toJsonString(data).toByteArray() },
{ _, data -> klaxon.parse<MapperConfiguration>(ByteArrayInputStream(data)) }),
this::parseConfig
)
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
val configStream = builder.stream<String, String>("import-process-config")
.map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
val jsonStream = stream.flatMapValues { value -> parseJsonFile(value) }
.branch(
Predicate { _, value -> value.containsKey("ERROR") },
Predicate { _, _ -> true }
)
val stream = builder.stream<String, String>(settings.inputTopic)
jsonStream[0]
.mapValues { value -> value["ERROR"] as String }
.to(settings.outputTopic)
val joinedStream =
configJoiner.join(stream, configStream)
jsonStream[0]
.mapValues { key, value ->
Report(
key,
ReportStatus.failure,
"Failed to parse json input: $value."
).toJson()
}
.to(reportTopic)
val jsonStream = joinedStream
.flatMapValues { value -> parseJsonFile(value) }
val extractedRecordIdStream = jsonStream[1]
val extractedRecordIdStream = jsonStream
.transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
.mapValues { value -> buildResources(value) }
.mapValues { value -> value.extractRecordId() }
......@@ -81,33 +84,29 @@ class KafkaTopology(
.mapValues { value -> value.generateDigitalObject() }
.mapValues { value -> value.addDerivedConnection() }
val recordStream = completedMappingStream.mapValues { value -> value.writeRecord() }
val recordStream = completedMappingStream
.map { _, value -> writeRecord(value) }
objectOutput(recordStream)
return builder
}
private fun objectOutput(stream: KStream<String, Pair<KeyValue<String, String>, Report>>) {
private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
stream
.map { _, value -> value.first }
.mapValues { _, value -> value.first }
.to(settings.outputTopic)
stream
.map { _, value -> KeyValue(value.first.key, value.second.toJson()) }
.mapValues { _, value -> value.second.toJson() }
.to(reportTopic)
}
private fun buildResources(value: Map<String, String>): ResourceBuilder {
private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
return ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
config.digitalObjectFieldMappers,
value,
institutionId,
recordSetId,
config.sponsoredByMemoriav,
isPublished
value.first.first,
value.first.second,
value.second.recordSetId,
value.second.institutionId,
value.second.isPublished
)
}
......@@ -120,11 +119,7 @@ class KafkaTopology(
val noRecordId = hasRecordId[1]
noRecordId
.mapValues { _ -> "ERROR: No record id specified!" }
.to(settings.outputTopic)
noRecordId
.mapValues { key, value -> value.failureReport(key).toJson() }
.mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.") }
.to(reportTopic)
return hasRecordId[0]
......@@ -141,38 +136,47 @@ class KafkaTopology(
val noRecordTypeValue = hasRecordTypeValue[1]
noRecordTypeValue
.mapValues { _ -> "ERROR: No record type value specified!" }
.to(settings.outputTopic)
noRecordTypeValue
.mapValues { key, value -> value.failureReport(key).toJson() }
.mapValues { key, value ->
Report(
key,
ReportStatus.failure,
"No correct record type found for record $key."
)
}
.to(reportTopic)
return hasRecordTypeValue[0]
}
private fun parseJsonFile(message: String): List<Map<String, String>> {
return if (message.contains("ERROR")) {
listOf(mapOf(Pair("ERROR", message)))
} else {
try {
Klaxon().parse<Map<String, String>>(message).let {
return if (it != null) {
listOf(it)
private fun parseJsonFile(input: JoinedValues<String, MapperConfiguration>):
List<Pair<Map<String, String>, MapperConfiguration>> {
return try {
Klaxon().parse<Map<String, String>>(input.left).let {
if (it != null) {
listOf(Pair(it, input.right))
} else {
listOf(mapOf(Pair("ERROR", "ERROR: Could not parse message: $message.")))
// TODO: REPORT EMPTY JSON
emptyList()
}
}
} catch (ex: KlaxonException) {
listOf(
mapOf(
// TODO: REPORT JSON PARSE EXCEPTIONS
emptyList()
}
}
private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
val result = builder.writeRecord()
return KeyValue(
result.first,
Pair(
"ERROR",
"ERROR: Could not parse message: $message. Because of ${ex.localizedMessage}."
)
result.second,
Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.")
)
)
}
}
private fun parseConfig(data: ByteArray): MapperConfiguration {
return MappingConfigurationParser(data).get()
}
}
\ No newline at end of file
......@@ -18,15 +18,9 @@
package org.memobase
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("MapperService")
......@@ -42,15 +36,9 @@ class Service(file: String = "app.yml") {
useStreamsConfig = true
)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val registry = EffectsRegistry()
private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
private val builder = KafkaTopology(settings).prepare()
fun run() {
registry.register(ShutdownMessage(appId.replace("-normalization-service", ""), "normalization-service", "termination"), shutdownEffect)
registry.run(builder, "import-process-admin")
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
......@@ -58,10 +46,7 @@ class Service(file: String = "app.yml") {
log.info("Service is running.")
Thread.sleep(10_000L)
}
throw Exception("Stream stopped running!")
}
}
private fun exit(): BoxedUnit {
exitProcess(0)
}
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.builder
import org.apache.jena.rdf.model.Resource
import org.memobase.helpers.StringHelpers
import org.memobase.mapping.KEYS
import org.memobase.rdf.NS
import org.memobase.rdf.RICO
class DigitalObject(sourceId: String, recordSetId: String, institutionId: String, count: Int) : Instantiation(institutionId) {
private val id = recordSetId + "-" + StringHelpers.normalizeId(sourceId) + "-" + count
override val resource: Resource = model.createResource(NS.mbdo + id)
init {
addRdfType(RICO.Instantiation)
resource.addProperty(RICO.type, "digitalObject")
addRicoConcept(KEYS.identifiers, "main", listOf(literal(id)))
}
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.builder
import org.apache.jena.rdf.model.Literal
interface IResource {
fun addLiteral(property: String, value: Literal)
fun addRicoConcept(rdfType: String, ricoType: String, value: List<Literal>)
fun addSkosConcept(type: String, properties: List<Pair<String, Literal>>)
fun addPlace(type: String, properties: List<Pair<String, Literal>>)
fun addDate(property: String, value: String)
fun addCreationRelation(relationType: String, relationName: List<Literal>, agentClass: String, properties: List<Pair<String, Literal>>)
fun addRule(type: String, value: List<Pair<String, List<Literal>>>)
fun addRicoCarrierType(names: List<Literal>)
fun addAgent(relation: String, agentType: String, properties: List<Pair<String, Literal>>)
fun langLiteral(text: String, language: String): Literal
fun literal(text: String): Literal
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.builder
import org.memobase.rdf.RICO
abstract class Instantiation(institutionId: String) : RecordResource(institutionId) {
fun addRecord(record: Record) {
resource.addProperty(RICO.instantiates, record.resource)
}
fun addDerivedInstantiation(instantiation: Instantiation) {
resource.addProperty(RICO.hasDerivedInstantiation, instantiation.resource)
}
fun addIsDerivedFromInstantiation(instantiation: Instantiation) {
resource.addProperty(RICO.isDerivedFromInstantiation, instantiation.resource)
}
}
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.builder
import org.apache.jena.rdf.model.Resource
import org.memobase.helpers.StringHelpers
import org.memobase.mapping.KEYS
import org.memobase.rdf.NS
import org.memobase.rdf.RICO
class PhysicalObject(sourceId: String, recordSetId: String, institutionId: String, count: Int) : Instantiation(institutionId) {
private val id = recordSetId + "-" + StringHelpers.normalizeId(sourceId) + "-" + count
override val resource: Resource = model.createResource(NS.mbpo + id)
init {
addRdfType(RICO.Instantiation)
resource.addProperty(RICO.type, "physicalObject")
addRicoConcept(KEYS.identifiers, "main", listOf(literal(id)))
}
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.builder
import org.apache.jena.datatypes.RDFDatatype
import org.apache.jena.rdf.model.Resource
import org.memobase.helpers.StringHelpers
import org.memobase.mapping.KEYS
import org.memobase.rdf.NS
import org.memobase.rdf.RDA
import org.memobase.rdf.RICO
import rdf.MB
class Record(
sourceId: String,
type: String,
recordSetId: String,
institutionId: String,
hasSponsoringAgent: Boolean,
isPublished: Boolean
) :
RecordResource(institutionId) {
private val id = recordSetId + "-" + StringHelpers.normalizeId(sourceId)
override val resource: Resource = model.createResource(NS.mbr + id)
init {
addRdfType(RICO.Record)
resource.addProperty(RICO.type, type)
resource.addProperty(RICO.isPartOf, recordSetUri(recordSetId))
resource.addProperty(RICO.heldBy, institutionUri)
addRicoConcept(KEYS.identifiers, "main", listOf(literal(id)))
if (hasSponsoringAgent) {
resource.addProperty(RDA.hasSponsoringAgentOfResource, model.createResource(KEYS.memoriavUri))
}
resource.addProperty(MB.isPublished, model.createTypedLiteral(isPublished))
}
fun addInstantiation(instantiation: Instantiation) {
resource.addProperty(RICO.hasInstantiation, instantiation.resource)
}
private fun recordSetUri(id: String): Resource = model.createResource(uri(NS.mbrs, id))
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.builder
import org.apache.jena.rdf.model.Literal
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.logging.log4j.LogManager
import org.memobase.mapping.KEYS
import org.memobase.rdf.DC
import org.memobase.rdf.EBUCORE
import org.memobase.rdf.NS
import org.memobase.rdf.RDA
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.rdf.SKOS
abstract class RecordResource(institutionId: String) : IResource {
private val log = LogManager.getLogger("ResourceBuilder")
val model: Model = ModelFactory.createDefaultModel()
abstract val resource: Resource
protected val institutionUri: Resource = institutionUri(institutionId)
init {
model.setNsPrefix("rico", NS.rico)
model.setNsPrefix("rdf", NS.rdf)
model.setNsPrefix("dct", NS.dcterms)
model.setNsPrefix("ebucore", NS.ebucore)
model.setNsPrefix("skos", NS.skos)
model.setNsPrefix("rdau", NS.rdau)
}
protected fun addRdfType(type: Resource) {
resource.addProperty(RDF.type, type)
}
override fun addLiteral(property: String, value: Literal) {
resource.addLiteral(KEYS.keysToPropertyMap[property], value)
}
override fun addRicoConcept(rdfType: String, ricoType: String, value: List<Literal>) {
when (rdfType) {
KEYS.titles ->
addRicoConcept(RICO.Title, ricoType, RICO.title, value, RICO.hasTitle)
KEYS.identifiers ->
addRicoConcept(RICO.Identifier, ricoType, RICO.identifier, value, RICO.identifiedBy)
KEYS.languages ->
addRicoConcept(RICO.Language, ricoType, RICO.name, value, RICO.hasLanguage)
else -> throw Exception("Unknown rdf:type alias for rico:Concept $rdfType.")
}
}
private fun addRicoConcept(rdfType: Resource, ricoType: String, valueProperty: Property, value: List<Literal>, resourceProperty: Property) {
val blank = model.createResource()
blank.addProperty(RDF.type, rdfType)
blank.addProperty(RICO.type, ricoType)
value.forEach {
blank.addProperty(valueProperty, it)
}
resource.addProperty(resourceProperty, blank)
}
override fun addRicoCarrierType(names: List<Literal>) {
val blank = model.createResource()
blank.addProperty(RDF.type, RICO.CarrierType)
names.forEach {
blank.addProperty(RICO.name, it)
}
resource.addProperty(RICO.hasCarrierType, blank)
}
override fun addSkosConcept(type: String, properties: List<Pair<String, Literal>>) {
val blank = model.createResource()
blank.addProperty(RDF.type, SKOS.Concept)
for (property in properties) {
blank.addProperty(SKOS.get(property.first), property.second)
}
when (type) {
KEYS.genre -> resource.addProperty(EBUCORE.hasGenre, blank)
KEYS.subject -> resource.addProperty(RICO.hasSubject, blank)
}
}
override fun addPlace(type: String, properties: List<Pair<String, Literal>>) {
val blank = model.createResource()
blank.addProperty(RDF.type, RICO.Place)
for (property in properties) {