Commit 7e51aabe authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update record set transformation.

parent db9ff62f
Pipeline #17745 passed with stages
in 4 minutes and 23 seconds
......@@ -5,6 +5,7 @@ metadata:
namespace: memobase
data:
APPLICATION_ID: "{{ .Values.deploymentName }}-app"
PATH_LANGUAGE_SOURCE: "{{ .Values.pathLanguageSource }}"
TOPIC_IN: "{{ .Values.inputTopic }}"
TOPIC_OUT: "{{ .Values.outputTopic }}"
TOPIC_PROCESS: "{{ .Values.deploymentName }}-reporting"
\ No newline at end of file
......@@ -25,4 +25,11 @@ spec:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.deploymentName}}-app-config"
volumeMounts:
- name: metadata-language-labels
mountPath: "/configs/languages/"
volumes:
- name: metadata-language-labels
configMap:
name: "{{ .Values.metadataLanguageLabels }}"
restartPolicy: Always
......@@ -6,4 +6,7 @@ tag: "latest"
deploymentName: drupal-sync
inputTopic: drupal-json-api-input
outputTopic: drupal-sync-output
kafkaConfigs: prod-kafka-bootstrap-servers
\ No newline at end of file
kafkaConfigs: prod-kafka-bootstrap-servers
metadataLanguageLabels: metadata-language-labels-csv
pathLanguageSource: /configs/languages/metadata_language_labels.csv
\ No newline at end of file
package org.memobase
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import java.io.StringReader
import org.apache.logging.log4j.LogManager
import org.memobase.model.Input
object JSON {
private val log = LogManager.getLogger("JsonParser")
private val klaxon = Klaxon()
fun parseJson(data: String): List<Input> {
return try {
val int = klaxon.parse<Input>(StringReader(data))
return if (int == null) emptyList()
else listOf(int)
} catch (ex: KlaxonException) {
log.error(ex.localizedMessage)
emptyList()
} catch (ex: ClassCastException) {
log.error(ex.localizedMessage)
emptyList()
}
}
}
\ No newline at end of file
......@@ -18,9 +18,6 @@
package org.memobase
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import java.io.StringReader
import org.apache.jena.rdf.model.Model
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
......@@ -32,35 +29,20 @@ import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("DrupalSyncProcessor")
private val transformer = Transform()
private val transformer = RdfTransformer(settings.appSettings)
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
stream
.flatMapValues { value -> parseJson(value) }
.flatMapValues { value -> transformJson(value) }
.map { _, value -> transformer.write(value.first, value.second) }
.flatMapValues { value -> JSON.parseJson(value) }
.flatMapValues { value -> mapJson(value) }
.map { _, value -> Util.writeModel(value.first, value.second) }
.to(settings.outputTopic)
return builder.build()
}
private fun parseJson(data: String): List<Input> {
return try {
val int = Klaxon().parse<Input>(StringReader(data))
return if (int == null) emptyList()
else listOf(int)
} catch (ex: KlaxonException) {
log.error(ex.localizedMessage)
emptyList()
} catch (ex: ClassCastException) {
log.error(ex.localizedMessage)
emptyList()
}
}
private fun transformJson(input: Input): List<Pair<String, Model>> {
private fun mapJson(input: Input): List<Pair<String, Model>> {
return when (input) {
is Institution -> listOf(transformer.createInstitution(input))
is RecordSet -> listOf(transformer.createRecordSet(input))
......
......@@ -18,31 +18,30 @@
package org.memobase
import java.io.StringWriter
import java.util.Properties
import org.apache.jena.rdf.model.Literal
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager
import org.memobase.model.Address
import org.memobase.model.Institution
import org.memobase.model.RecordSet
import org.memobase.rdf.NS
import org.memobase.rdf.RDA
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.rdf.SCHEMA
import org.memobase.rdf.WD
import rdf.MB
class Transform {
private val log = LogManager.getLogger("InstitutionTransform")
private val cantons = Helpers.getCantons()
private val municipalities = Helpers.getMunicipalities()
class RdfTransformer(properties: Properties) {
private val log = LogManager.getLogger("RdfTransformer")
private val cantons = Util.getCantons()
private val municipalities = Util.getMunicipalities()
private val languages = Util.loadLanguages(properties.getProperty(Util.languageSourceFilePathPropertyName))
fun createInstitution(input: Institution): Pair<String, Model> {
val model = ModelFactory.createDefaultModel()
......@@ -52,17 +51,9 @@ class Transform {
resource.addProperty(RICO.type, "memobaseInstitution")
resource.addLiteral(MB.isPublished, input.status)
val identifier = model.createResource()
identifier.addProperty(RDF.type, RICO.Identifier)
identifier.addProperty(RICO.type, literal("main"))
identifier.addProperty(RICO.identifier, literal(input.field_memobase_id))
resource.addProperty(RICO.identifiedBy, identifier)
val oldIdentifier = model.createResource()
oldIdentifier.addProperty(RDF.type, RICO.Identifier)
oldIdentifier.addProperty(RICO.type, literal("oldMemobase"))
oldIdentifier.addProperty(RICO.identifier, literal(input.field_old_memobase_id))
resource.addProperty(RICO.identifiedBy, oldIdentifier)
resource.addProperty(RICO.identifiedBy, addIdentifier(model, "main", input.field_memobase_id))
if (input.field_old_memobase_id != null)
resource.addProperty(RICO.identifiedBy, addIdentifier(model, "oldMemobase", input.field_old_memobase_id))
input.recordset_ids.forEach {
resource.addProperty(RICO.isHolderOf, NS.mbrs + it)
......@@ -105,7 +96,106 @@ class Transform {
}
fun createRecordSet(input: RecordSet): Pair<String, Model> {
return Pair("", ModelFactory.createDefaultModel())
val model = ModelFactory.createDefaultModel()
val resource = model.createResource(NS.mbrs + input.field_memobase_id)
resource.addProperty(RDF.type, RICO.RecordSet)
resource.addLiteral(MB.isPublished, input.status)
resource.addProperty(RICO.identifiedBy, addIdentifier(model, "main", input.field_memobase_id))
if (input.field_old_memobase_id != null)
resource.addProperty(RICO.identifiedBy, addIdentifier(model, "oldMemobase", input.field_old_memobase_id))
input.field_institution.forEach {
resource.addProperty(RICO.heldBy, NS.mbcb + it)
}
resource.addProperty(RICO.title, langLiteral(input.title, "de"))
resource.addProperty(RICO.title, langLiteral(input.title_fr, "fr"))
resource.addProperty(RICO.title, langLiteral(input.title_it, "it"))
resource.addProperty(RICO.descriptiveNote, langLiteral(input.field_text.value, "de"))
resource.addProperty(RICO.descriptiveNote, langLiteral(input.field_text_fr.value, "fr"))
resource.addProperty(RICO.descriptiveNote, langLiteral(input.field_text_it.value, "it"))
input.field_metadata_language_codes.forEach {
// rico:hasLanguage metadata
resource.addProperty(RICO.hasLanguage, addLanguage(model, it))
}
// rico:hasTitle main
resource.addProperty(RICO.hasTitle, addTitle(model, input.title, input.title_fr, input.title_it))
resource.addProperty(RDA.hasSponsoringAgentOfResource, model.createResource(Util.memoriavUri))
if (input.computed_teaser_image_url != null)
resource.addProperty(WD.image, literal(input.computed_teaser_image_url))
// rico:scopeAndContent -> Inhalt
// rico:history -> Kontext
// rico:integrity -> Auswahl / Vollständigkeit
// dct:conformsTo -> Informationen zur Erschliessung
// rico:Identifier callNumber -> Original-Signatur des Bestands
// dct:created
// rico:type -> Dokumenttyp
// rico:descriptiveNote -> Datenübernahme
// rico:Title original
// rico:isRecordResourceAssociatedWithRecordResource -> Dokumente rico:Record ( sameAs + title) | Verwandte Bestände rico:RecordSet
// Originale Bestandesbeschreibung -> rico:RecordSet -> rico:hasSource
// rico:recordResourceOrInstantiationIsTargetOfRecordResourceHoldingRelation
// rico:recordResourceExtent -> Umfang
// rico:isSubjectOf -> publications rico:Record (sameAs + title)
// rico:conditionsOfUse -> Rechte
// rico:conditionsOfAccess -> Zugang
// rico:publicationDate -> Datum der Übernahme in Memobase
// rico:modificationDate (is enriched when indexed, is that enough? Should I do this here?).
// hasTitle original-> Originaltitel des Bestands
// schema:sameAs -> Projektname und Beschreibung | Publikationen
// rdau:P60099 has language of resource -> What is this supposed to do?
// rdau:P60470 has note on resource -> Hinweise
// rdau:P60848 has referential resource relationship with
return Pair(resource.uri, model)
}
private fun addIdentifier(model: Model, type: String, value: String): Resource {
val identifier = model.createResource()
identifier.addProperty(RDF.type, RICO.Identifier)
identifier.addProperty(RICO.type, literal(type))
identifier.addProperty(RICO.identifier, literal(value))
return identifier
}
// TODO: Add language translations.
private fun addLanguage(model: Model, value: String): Resource {
val language = model.createResource()
language.addProperty(RDF.type, RICO.Language)
language.addProperty(RICO.type, literal("metadata"))
languages[value].let {
if (it == null) {
language.addProperty(RICO.name, literal(value))
} else {
language.addProperty(RICO.name, langLiteral(it.de, "de"))
language.addProperty(RICO.name, langLiteral(it.fr, "fr"))
language.addProperty(RICO.name, langLiteral(it.it, "it"))
language.addProperty(SCHEMA.sameAs, literal(it.id))
}
}
return language
}
private fun addTitle(model: Model, de: String, fr: String, it: String): Resource {
val language = model.createResource()
language.addProperty(RDF.type, RICO.Title)
language.addProperty(RICO.type, literal("main"))
language.addProperty(RICO.title, langLiteral(de, "de"))
language.addProperty(RICO.title, langLiteral(fr, "fr"))
language.addProperty(RICO.title, langLiteral(it, "it"))
return language
}
private fun generateLocationResource(model: Model, address: Address): Resource {
......@@ -168,14 +258,6 @@ class Transform {
return location
}
fun write(uri: String, model: Model): KeyValue<String, String> {
return StringWriter().use { writer ->
RDFDataMgr.write(writer, model, RDFFormat.NTRIPLES_UTF8)
return@use KeyValue(uri, writer.toString().trim())
}
}
private fun langLiteral(text: String, language: String): Literal =
ResourceFactory.createLangLiteral(text.trim(), language)
......
/*
* Drupal Sync Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
data class Report(
val id: String,
val status: String,
val message: String
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
/*
* Drupal Sync Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
......@@ -27,6 +27,7 @@ class Service(file: String = "app.yml") {
val settings = SettingsLoader(
listOf(
Util.languageSourceFilePathPropertyName
),
file,
useStreamsConfig = true
......
......@@ -17,16 +17,29 @@
*/
package org.memobase
import java.io.File
import java.io.FileInputStream
import java.io.FileNotFoundException
import java.io.StringWriter
import kotlin.system.exitProcess
import org.apache.jena.rdf.model.Model
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager
import org.memobase.model.Canton
import org.memobase.model.Municipality
import org.memobase.model.IdLabels
import org.memobase.rdf.NS
object Helpers {
object Util {
const val languageSourceFilePathPropertyName = "path.languages"
const val memoriavUri = NS.mbcb + "mrv"
private val wikidataNamespace = "http://www.wikidata.org/entity/"
private val log = LogManager.getLogger("DrupalSyncHelpers")
fun getMunicipalities(): Map<String, Municipality> {
fun getMunicipalities(): Map<String, IdLabels> {
val stream = ClassLoader.getSystemResourceAsStream("municipalities.tsv")
if (stream != null) {
return stream.bufferedReader().lineSequence().filterNot {
......@@ -37,12 +50,14 @@ object Helpers {
codes.flatMap { code ->
code.split("-").map { c -> c.trim() }
}.map { code ->
Pair(code, Municipality(
values[1].trim(),
values[2].trim(),
values[3].trim(),
values[4].trim()
))
Pair(
code, IdLabels(
values[1].trim(),
values[2].trim(),
values[3].trim(),
values[4].trim()
)
)
}
}.flatten().toMap()
} else {
......@@ -51,19 +66,42 @@ object Helpers {
}
}
fun getCantons(): Map<String, Canton> {
fun loadLanguages(path: String): Map<String, IdLabels> {
return try {
val stream = FileInputStream(File(path))
stream.bufferedReader().lineSequence().filterNot { it.startsWith("code") }
.map {
val values = it.split(",")
Pair(
values[0].trim(),
IdLabels(NS.wd + values[1].trim(), values[2].trim(), values[3].trim(), values[4].trim())
)
}.toMap()
} catch (ex: FileNotFoundException) {
log.error("Could not find language labels in path $path.")
exitProcess(1)
}
}
fun getCantons(): Map<String, IdLabels> {
val stream = ClassLoader.getSystemResourceAsStream("cantons.csv")
if (stream != null) {
return stream.bufferedReader().lineSequence().filterNot {
it.startsWith("code")
}.map {
val values = it.split(",")
Pair(values[0].trim(), Canton(values[1].trim(), values[2].trim(), values[3].trim(), values[4].trim()))
Pair(values[0].trim(), IdLabels(values[1].trim(), values[2].trim(), values[3].trim(), values[4].trim()))
}.toMap()
} else {
log.error("Could not load cantons.csv from classpath!")
exitProcess(1)
}
}
fun writeModel(uri: String, model: Model): KeyValue<String, String> {
return StringWriter().use { writer ->
RDFDataMgr.write(writer, model, RDFFormat.NTRIPLES_UTF8)
return@use KeyValue(uri, writer.toString().trim())
}
}
}
\ No newline at end of file
/*
* Drupal Sync Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.model
data class Canton(
val id: String,
val de: String,
val fr: String,
val it: String
)
package org.memobase.model
data class IdLabels(
val id: String,
val de: String,
val fr: String,
val it: String
)
\ No newline at end of file
......@@ -30,7 +30,7 @@ data class Institution(
val field_isil: String?,
val field_memobase_id: String,
val field_old_memobase_id: String,
val field_old_memobase_id: String?,
val field_email: String?,
val field_website: Link?,
......
/*
* Drupal Sync Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.model
data class Municipality(
val id: String,
val de: String,
val fr: String,
val it: String
)
\ No newline at end of file
......@@ -22,12 +22,18 @@ data class RecordSet(
val title_fr: String,
val title_it: String,
val status: Boolean,
val field_text: RichText,
val field_text_fr: RichText,
val field_text_it: RichText,
val field_memobase_id: String,
val field_old_memobase_id: String? = null,
val field_supported_by_memoriav: Boolean,
val field_metadata_language_codes: List<String>,
val institutions_ids: List<String>,
val computed_teaser_image_url: String
val field_institution: List<String>,
val computed_teaser_image_url: String? = null
) : Input("RecordSet")
\ No newline at end of file
app:
path:
languages: ${PATH_LANGUAGE_SOURCE:?system}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -62,14 +62,14 @@ class Test {
@Test
fun `test municipalities loader`() {
val result = Helpers.getMunicipalities()
val result = Util.getMunicipalities()
assertThat(result)
.isNotEmpty
}
@Test
fun `test cantons loader`() {
val result = Helpers.getCantons()
val result = Util.getCantons()
assertThat(result)
.isNotEmpty
}
......
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the