Commit 8fd39835 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update sync service

parent 24a7583f
...@@ -35,7 +35,7 @@ dependencies { ...@@ -35,7 +35,7 @@ dependencies {
// https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client // https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
//compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.1.0' //compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.1.0'
implementation 'org.memobase:memobase-service-utilities:1.4.0' implementation 'org.memobase:memobase-service-utilities:0.14.0'
// Logging Framework // Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}" implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
......
/* /*
* search-doc-service * drupal-sync-service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
...@@ -22,7 +22,7 @@ import org.apache.logging.log4j.LogManager ...@@ -22,7 +22,7 @@ import org.apache.logging.log4j.LogManager
class App { class App {
companion object { companion object {
private val log = LogManager.getLogger("App") private val log = LogManager.getLogger("DrupalSyncApp")
@JvmStatic fun main(args: Array<String>) { @JvmStatic fun main(args: Array<String>) {
try { try {
Service().run() Service().run()
......
/*
* drupal-sync-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase package org.memobase
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
...@@ -7,7 +24,7 @@ import kotlin.system.exitProcess ...@@ -7,7 +24,7 @@ import kotlin.system.exitProcess
object Helpers { object Helpers {
private val log = LogManager.getLogger("Helper") private val log = LogManager.getLogger("DrupalSyncHelpers")
fun getMunicipalities(): Map<String, Municipality> { fun getMunicipalities(): Map<String, Municipality> {
val stream = ClassLoader.getSystemResourceAsStream("municipalities.tsv") val stream = ClassLoader.getSystemResourceAsStream("municipalities.tsv")
......
/* /*
* Table Data Import Service * Drupal Sync Service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
...@@ -20,13 +20,12 @@ package org.memobase ...@@ -20,13 +20,12 @@ package org.memobase
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException import com.beust.klaxon.KlaxonException
import org.apache.jena.rdf.model.Model
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.model.LanguageString import org.memobase.model.*
import org.memobase.model.LanguageVariants
import org.memobase.model.MergedAddress
import org.memobase.model.MergedInstitution
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import java.io.StringReader import java.io.StringReader
...@@ -34,23 +33,23 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -34,23 +33,23 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("StreamsProcessing") private val log = LogManager.getLogger("StreamsProcessing")
private val municipalities = Helpers.getMunicipalities() private val municipalities = Helpers.getMunicipalities()
private val transformer = Transform(municipalities)
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
stream stream
.flatMapValues { value -> parseJson(value) } .flatMapValues { value -> parseJson(value) }
.mapValues { value -> mergeTranslations(value) }
.mapValues { value -> transformJson(value) } .mapValues { value -> transformJson(value) }
.map { _, value -> value.write() } .map { _, value -> transformer.write(value.first, value.second) }
.to(settings.outputTopic) .to(settings.outputTopic)
return builder.build() return builder.build()
} }
private fun parseJson(data: String): List<LanguageVariants> { private fun parseJson(data: String): List<Input> {
return try { return try {
val int = Klaxon().parse<LanguageVariants>(StringReader(data)) val int = Klaxon().parse<Input>(StringReader(data))
return if (int == null) emptyList() return if (int == null) emptyList()
else listOf(int) else listOf(int)
} catch (ex: KlaxonException) { } catch (ex: KlaxonException) {
...@@ -59,26 +58,13 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -59,26 +58,13 @@ class KafkaTopology(private val settings: SettingsLoader) {
} }
} }
private fun mergeTranslations(input: LanguageVariants): MergedInstitution { private fun transformJson(input: Input): Pair<String, Model> {
return MergedInstitution( return when (input) {
input.de.field_memobase_id, is Institution -> transformer.createInstitution(input)
LanguageString(input.de.title, input.fr.title, input.it.title), is RecordSet -> transformer.createRecordSet(input)
LanguageString(input.de.field_text.value, input.fr.field_text.value, input.it.field_text.value), else -> throw Exception("Unknown Type Input.")
input.de.field_isil, }
input.de.field_email,
input.de.field_link_archive_catalog.uri,
input.de.field_website.uri,
input.de.field_address.mapIndexed { index, address ->
MergedAddress(
LanguageString(address.address_line1, input.fr.field_address[index].address_line1, input.it.field_address[index].address_line1),
LanguageString(address.address_line2, input.fr.field_address[index].address_line2, input.it.field_address[index].address_line2),
address.postal_code
)
}
)
}
private fun transformJson(input: MergedInstitution): Transform {
return Transform(municipalities).createInstitution(input)
} }
} }
/* /*
* mapper-service * Drupal Sync Service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
......
/* /*
* mapper-service * Drupal Sync Service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
......
/* /*
* Table Data Import Service * Drupal Sync Service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
...@@ -23,7 +23,7 @@ import org.apache.logging.log4j.LogManager ...@@ -23,7 +23,7 @@ import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") { class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("SearchDocService") private val log = LogManager.getLogger("DrupalSyncService")
val settings = SettingsLoader( val settings = SettingsLoader(
listOf( listOf(
......
/*
* Drupal Sync Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase package org.memobase
import org.apache.jena.rdf.model.Literal import org.apache.jena.rdf.model.*
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat import org.apache.jena.riot.RDFFormat
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.model.MergedAddress import org.memobase.model.*
import org.memobase.model.MergedInstitution
import org.memobase.model.Municipality
import org.memobase.rdf.NS import org.memobase.rdf.NS
import org.memobase.rdf.RDF import org.memobase.rdf.RDF
import org.memobase.rdf.RICO import org.memobase.rdf.RICO
...@@ -19,72 +33,74 @@ import java.io.StringWriter ...@@ -19,72 +33,74 @@ import java.io.StringWriter
class Transform(private val municipalities: Map<String, Municipality>) { class Transform(private val municipalities: Map<String, Municipality>) {
private val log = LogManager.getLogger("Transform") private val log = LogManager.getLogger("Transform")
private val model = ModelFactory.createDefaultModel()
private var resource: Resource? = null fun createInstitution(input: Institution): Pair<String, Model> {
private var uri: String = "" val model = ModelFactory.createDefaultModel()
fun createInstitution(input: MergedInstitution): Transform { val resource = model.createResource(NS.mbcb + input.field_memobase_id)
resource = model.createResource(NS.memint + input.id)
val identifier = model.createResource()
resource.let { valResource -> identifier.addProperty(RDF.type, RICO.Identifier)
if (valResource != null) { identifier.addProperty(RICO.type, literal("main"))
uri = valResource.uri identifier.addProperty(RICO.identifier, literal(input.field_memobase_id))
resource.addProperty(RICO.identifiedBy, identifier)
val identifier = model.createResource()
identifier.addProperty(RDF.type, RICO.Identifier) // TODO: proper multi language integration!
identifier.addProperty(RICO.type, literal("main")) resource.addProperty(RICO.name, langLiteral(input.title, "de"))
identifier.addProperty(RICO.identifier, literal(input.id)) resource.addProperty(RICO.name, langLiteral(input.title_fr, "fr"))
valResource.addProperty(RICO.identifiedBy, identifier) resource.addProperty(RICO.name, langLiteral(input.title_it, "it"))
resource.addProperty(RICO.descriptiveNote, langLiteral(input.field_text.value, "de"))
// TODO: proper multi language integration! resource.addProperty(RICO.descriptiveNote, langLiteral(input.field_text_fr.value, "fr"))
valResource.addProperty(RICO.name, langLiteral(input.name.de, "de")) resource.addProperty(RICO.descriptiveNote, langLiteral(input.field_text_it.value, "it"))
valResource.addProperty(RICO.name, langLiteral(input.name.fr, "fr"))
valResource.addProperty(RICO.name, langLiteral(input.name.it, "it")) input.field_address.forEach {address ->
valResource.addProperty(RICO.descriptiveNote, langLiteral(input.description.de, "de")) val location = generateLocationResource(address)
valResource.addProperty(RICO.descriptiveNote, langLiteral(input.description.fr, "fr")) resource.addProperty(RICO.hasLocation, location)
valResource.addProperty(RICO.descriptiveNote, langLiteral(input.description.it, "it"))
input.addresses.forEach {
extractAddressField(valResource, it)
}
valResource.addProperty(WD.isil, literal(input.isil))
valResource.addProperty(WD.emailAddress, literal(input.contactEmail))
valResource.addProperty(WD.website, literal(input.website))
valResource.addProperty(WD.onlineArchive, literal(input.onlineCatalogueLink))
}
} }
input.field_isil.let {
if (it != null)
resource.addProperty(WD.isil, literal(it))
}
input.field_email.let {
if (it != null) {
resource.addProperty(WD.emailAddress, literal(it))
}
}
return this input.field_website.let {
if (it != null)
resource.addProperty(WD.website, literal(it.uri))
}
input.field_link_archive_catalog.let {
if (it != null)
resource.addProperty(WD.onlineArchive, literal(it.uri))
}
return Pair(resource.uri, model)
} }
fun write(): KeyValue<String, String> { fun createRecordSet(input: RecordSet): Pair<String, Model> {
return StringWriter().use { writer -> return Pair("", ModelFactory.createDefaultModel())
RDFDataMgr.write(writer, model, RDFFormat.NTRIPLES_UTF8)
return@use KeyValue(uri, writer.toString().trim())
}
} }
private fun extractAddressField(resource: Resource, mergedAddress: MergedAddress) { private fun generateLocationResource(address: Address): Resource {
val location = model.createResource() val location = ResourceFactory.createResource()
listOf("de", "fr", "it").forEach { val streetAddress = address.address_line1
val streetAddress = mergedAddress.addressLine1.get(it) val secondAddressLine = address.address_line2
val secondAddressLine = mergedAddress.addressLine2.get(it) val combinedStreetAddress = if (secondAddressLine.isNullOrEmpty()) {
val combinedStreetAddress = if (secondAddressLine.isNotEmpty()) { streetAddress
streetAddress + "\n" + secondAddressLine } else {
} else { streetAddress + "\n" + secondAddressLine
streetAddress
}
val streetNumber = streetAddress.substringAfterLast(" ")
val street = streetAddress.replace(streetNumber, "").trim()
location.addProperty(WD.street, langLiteral(street, it))
location.addProperty(WD.streetNumber, literal(streetNumber))
location.addProperty(WD.streetAddress, langLiteral(combinedStreetAddress, it))
} }
val streetNumber = streetAddress.substringAfterLast(" ")
val street = streetAddress.replace(streetNumber, "").trim()
location.addProperty(WD.street, literal(street))
location.addProperty(WD.streetNumber, literal(streetNumber))
location.addProperty(WD.streetAddress, literal(combinedStreetAddress))
val postalCode = mergedAddress.postalCode.trim() val postalCode = address.postal_code.trim()
val municipality = if (municipalities.containsKey(postalCode)) { val municipality = if (municipalities.containsKey(postalCode)) {
municipalities[postalCode] municipalities[postalCode]
} else { } else {
...@@ -97,19 +113,26 @@ class Transform(private val municipalities: Map<String, Municipality>) { ...@@ -97,19 +113,26 @@ class Transform(private val municipalities: Map<String, Municipality>) {
// does not enrich city, canton or cantons, if the postal code is not in the list. // does not enrich city, canton or cantons, if the postal code is not in the list.
if (municipality != null) { if (municipality != null) {
// canton // canton
location.addProperty(WD.adminUnit, model.createResource(municipality.canton)) location.addProperty(WD.adminUnit, ResourceFactory.createResource(municipality.canton))
// city // city
location.addProperty(WD.adminUnit, model.createResource(municipality.id)) location.addProperty(WD.adminUnit, ResourceFactory.createResource(municipality.id))
// coordinates // TODO: Replace this with the coordinates from source!
municipality.coordinates.forEach { coordinate -> municipality.coordinates.forEach { coordinate ->
location.addProperty(WD.coordinates, literal(coordinate)) location.addProperty(WD.coordinates, literal(coordinate))
} }
} }
// country is currently hard coded to switzerland! // country is currently hard coded to switzerland!
location.addProperty(WD.country, WD.switzerland) location.addProperty(WD.country, WD.switzerland)
resource.addProperty(RICO.hasLocation, location) return location
}
fun write(uri: String, model: Model): KeyValue<String, String> {
return StringWriter().use { writer ->
RDFDataMgr.write(writer, model, RDFFormat.NTRIPLES_UTF8)
return@use KeyValue(uri, writer.toString().trim())
}
} }
private fun langLiteral(text: String, language: String): Literal = model.createLiteral(text.trim(), language) private fun langLiteral(text: String, language: String): Literal = ResourceFactory.createLangLiteral(text.trim(), language)
private fun literal(text: String): Literal = model.createLiteral(text.trim()) private fun literal(text: String): Literal = ResourceFactory.createPlainLiteral(text.trim())
} }
\ No newline at end of file
package org.memobase.model package org.memobase.model
data class Address( data class Address(
val country_code: String,
val address_line1: String, val address_line1: String,
val address_line2: String, val address_line2: String?,
val postal_code: String val locality: String,
val postal_code: String,
val administrative_area: String?,
val dependent_locality: String?,
val langcode: String?,
val sorting_code: String?,
val organization: String?,
val given_name: String?,
val additional_name: String?,
val family_name: String?
) )
\ No newline at end of file
/* /*
* mapper-service * Drupal Sync Service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
...@@ -15,22 +15,9 @@ ...@@ -15,22 +15,9 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase.model
package org.memobase.rdf import com.beust.klaxon.TypeFor
import org.apache.jena.rdf.model.Property @TypeFor(field = "type", adapter = InputTypeAdapter::class)
import org.apache.jena.rdf.model.Resource open class Input(val type: String)
import org.apache.jena.rdf.model.ResourceFactory \ No newline at end of file
object SCHEMA {
val sameAs: Property = prop("sameAs")
private fun prop(name: String): Property {
return ResourceFactory.createProperty(NS.schema, name)
}
private fun res(name: String): Resource {
return ResourceFactory.createResource(NS.schema + name)
}
}
\ No newline at end of file
/* /*
* mapper-service * Drupal Sync Service
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
...@@ -15,22 +15,15 @@ ...@@ -15,22 +15,15 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase.model
package org.memobase.rdf import com.beust.klaxon.TypeAdapter
import kotlin.reflect.KClass
import org.apache.jena.rdf.model.Property class InputTypeAdapter : TypeAdapter<Input> {
import org.apache.jena.rdf.model.Resource override fun classFor(type: Any): KClass<out Input> = when(type as String) {
import org.apache.jena.rdf.model.ResourceFactory "node--institution" -> Institution::class
"node--recordset" -> RecordSet::class
object RDF { else -> throw IllegalArgumentException("Unknown type: $type")
val type: Property = prop("type")
private fun prop(name: String): Property {
return ResourceFactory.createProperty(NS.rdf, name)
}
private fun res(name: String): Resource {
return ResourceFactory.createResource(NS.rdf + name)
} }
} }
\ No newline at end of file
/*
* Drupal Sync Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.model package org.memobase.model
data class Institution( data class Institution(
val field_memobase_id: String, val status: Boolean,
val title: String, val title: String,
val title_fr: String,
val title_it: String,
val field_address: List<Address>,