Commit 3e30cb56 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor transformation implementation

parent e546e960
package org.memobase package org.memobase
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.model.Institution
import org.memobase.model.Municipality
import kotlin.system.exitProcess import kotlin.system.exitProcess
object MunicipalitiesLoader { object Helpers {
private val log = LogManager.getLogger("MunicipalitiesLoader") private val log = LogManager.getLogger("Helper")
fun getMunicipalities(): Map<String, Municipality> { fun getMunicipalities(): Map<String, Municipality> {
val stream = ClassLoader.getSystemResourceAsStream("municipalities.tsv") val stream = ClassLoader.getSystemResourceAsStream("municipalities.tsv")
......
...@@ -18,43 +18,67 @@ ...@@ -18,43 +18,67 @@
package org.memobase package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException import com.beust.klaxon.KlaxonException
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.model.LanguageString
import org.memobase.model.LanguageVariants
import org.memobase.model.MergedAddress
import org.memobase.model.MergedInstitution
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import java.io.StringReader import java.io.StringReader
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("StreamsProcessing") private val log = LogManager.getLogger("StreamsProcessing")
private val municipalities = MunicipalitiesLoader.getMunicipalities() private val municipalities = Helpers.getMunicipalities()
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
stream stream
.flatMapValues { value -> parseJson(value) } .flatMapValues { value -> parseJson(value) }
.flatMapValues { value -> value.keys.map { key -> Pair(value[key] as JsonObject, key) } } .mapValues { value -> mergeTranslations(value) }
.mapValues { value -> transformJson(value.first, value.second) } .mapValues { value -> transformJson(value) }
.map { _, value -> value.write() } .map { _, value -> value.write() }
.to(settings.outputTopic) .to(settings.outputTopic)
return builder.build() return builder.build()
} }
private fun parseJson(data: String): List<JsonObject> { private fun parseJson(data: String): List<LanguageVariants> {
return try { return try {
listOf(Klaxon().parseJsonObject(StringReader(data))) val int = Klaxon().parse<LanguageVariants>(StringReader(data))
return if (int == null) emptyList()
else listOf(int)
} catch (ex: KlaxonException) { } catch (ex: KlaxonException) {
log.error("Failed to parse source: $data.") log.error("Failed to parse source: $data.")
emptyList() emptyList()
} }
} }
private fun transformJson(input: JsonObject, language: String): Transform { private fun mergeTranslations(input: LanguageVariants): MergedInstitution {
return Transform(municipalities).createInstitution(input, language) return MergedInstitution(
input.de.field_memobase_id,
LanguageString(input.de.title, input.fr.title, input.it.title),
LanguageString(input.de.field_text.value, input.fr.field_text.value, input.it.field_text.value),
input.de.field_isil,
input.de.field_email,
input.de.field_link_archive_catalog.uri,
input.de.field_website.uri,
input.de.field_address.mapIndexed { index, address ->
MergedAddress(
LanguageString(address.address_line1, input.fr.field_address[index].address_line1, input.it.field_address[index].address_line1),
LanguageString(address.address_line2, input.fr.field_address[index].address_line2, input.it.field_address[index].address_line2),
address.postal_code
)
}
)
}
private fun transformJson(input: MergedInstitution): Transform {
return Transform(municipalities).createInstitution(input)
} }
} }
package org.memobase package org.memobase
import com.beust.klaxon.JsonObject
import org.apache.jena.rdf.model.Literal import org.apache.jena.rdf.model.Literal
import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource import org.apache.jena.rdf.model.Resource
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat import org.apache.jena.riot.RDFFormat
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.rdf.* import org.memobase.model.MergedAddress
import org.memobase.model.MergedInstitution
import org.memobase.model.Municipality
import org.memobase.rdf.NS
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.rdf.WD
import java.io.StringWriter import java.io.StringWriter
import kotlin.system.exitProcess
class Transform(private val municipalities: Map<String, Municipality>) { class Transform(private val municipalities: Map<String, Municipality>) {
private val log = LogManager.getLogger("Transform") private val log = LogManager.getLogger("Transform")
private val model = ModelFactory.createDefaultModel() private val model = ModelFactory.createDefaultModel()
private var resource: Resource? = null
private var uri: String = "" private var uri: String = ""
fun createInstitution(source: JsonObject, language: String): Transform { fun createInstitution(input: MergedInstitution): Transform {
val id = source["field_memobase_id"].let { resource = model.createResource(NS.memint + input.id)
if (it is String) {
it resource.let { valResource ->
} else { if (valResource != null) {
log.error("No field memobase id defined.") uri = valResource.uri
exitProcess(1)
} val identifier = model.createResource()
} identifier.addProperty(RDF.type, RICO.Identifier)
val resource = model.createResource(NS.memint + id) identifier.addProperty(RICO.type, literal("main"))
uri = resource.uri identifier.addProperty(RICO.identifier, literal(input.id))
valResource.addProperty(RICO.identifiedBy, identifier)
val identifier = model.createResource()
identifier.addProperty(RDF.type, RICO.Identifier) // TODO: proper multi language integration!
identifier.addProperty(RICO.type, literal("main")) valResource.addProperty(RICO.name, langLiteral(input.name.de, "de"))
identifier.addProperty(RICO.identifier, literal(id)) valResource.addProperty(RICO.name, langLiteral(input.name.fr, "fr"))
resource.addProperty(RICO.identifiedBy, identifier) valResource.addProperty(RICO.name, langLiteral(input.name.it, "it"))
valResource.addProperty(RICO.descriptiveNote, langLiteral(input.description.de, "de"))
// TODO: proper multi language integration! valResource.addProperty(RICO.descriptiveNote, langLiteral(input.description.fr, "fr"))
resource.addProperty(RICO.name, langLiteral(source["field_name"] as String, language)) valResource.addProperty(RICO.descriptiveNote, langLiteral(input.description.it, "it"))
resource.addProperty(RICO.descriptiveNote, langLiteral(source["field_text"] as String, language))
input.addresses.forEach {
source["field_addresses"].let { fieldAddressValue -> extractAddressField(valResource, it)
try {
fieldAddressValue as List<JsonObject>
fieldAddressValue.forEach { fieldAddress ->
extractAddressField(resource, fieldAddress)
} }
} catch (ex: ClassCastException) {
log.warn("Could not cast field_addresses to JsonObject: $fieldAddressValue.") valResource.addProperty(WD.isil, literal(input.isil))
valResource.addProperty(WD.emailAddress, literal(input.contactEmail))
valResource.addProperty(WD.website, literal(input.website))
valResource.addProperty(WD.onlineArchive, literal(input.onlineCatalogueLink))
} }
} }
extractSimpleField(resource, WD.isil, source, "field_isil")
extractSimpleField(resource, WD.website, source, "field_website")
extractSimpleField(resource, WD.emailAddress, source, "field_email")
extractSimpleField(resource, WD.onlineArchive, source, "field_online_archive")
extractSimpleField(resource, SCHEMA.sameAs, source, "wikidata_id")
extractSimpleField(resource, WD.image, source, "image")
extractSimpleField(resource, WD.logo, source, "logo")
extractSimpleField(resource, WD.typeOfInstitution, source, "instance_of")
return this return this
} }
...@@ -70,29 +66,25 @@ class Transform(private val municipalities: Map<String, Municipality>) { ...@@ -70,29 +66,25 @@ class Transform(private val municipalities: Map<String, Municipality>) {
} }
} }
private fun extractSimpleField(resource: Resource, property: Property, source: JsonObject, fieldName: String) { private fun extractAddressField(resource: Resource, mergedAddress: MergedAddress) {
source[fieldName].let { val location = model.createResource()
if (it is String) {
resource.addProperty(property, literal(it)) listOf("de", "fr", "it").forEach {
val streetAddress = mergedAddress.addressLine1.get(it)
val secondAddressLine = mergedAddress.addressLine2.get(it)
val combinedStreetAddress = if (secondAddressLine.isNotEmpty()) {
streetAddress + "\n" + secondAddressLine
} else { } else {
log.warn("No value for $fieldName found in source for institution $uri.") streetAddress
} }
val streetNumber = streetAddress.substringAfterLast(" ")
val street = streetAddress.replace(streetNumber, "").trim()
location.addProperty(WD.street, langLiteral(street, it))
location.addProperty(WD.streetNumber, literal(streetNumber))
location.addProperty(WD.streetAddress, langLiteral(combinedStreetAddress, it))
} }
}
private fun extractAddressField(resource: Resource, fieldAddress: JsonObject) {
val location = model.createResource()
val streetAddress = fieldAddress["address_line1"] as String
val secondAddressLine = fieldAddress["address_line2"] as String?
val combinedStreetAddress = if (secondAddressLine != null) {
streetAddress + "\n" + secondAddressLine
} else {
streetAddress
}
val streetNumber = streetAddress.substringAfterLast(" ")
val street = streetAddress.replace(streetNumber, "").trim()
val postalCode = (fieldAddress["postal_code"] as String).trim() val postalCode = mergedAddress.postalCode.trim()
val municipality = if (municipalities.containsKey(postalCode)) { val municipality = if (municipalities.containsKey(postalCode)) {
municipalities[postalCode] municipalities[postalCode]
} else { } else {
...@@ -101,9 +93,6 @@ class Transform(private val municipalities: Map<String, Municipality>) { ...@@ -101,9 +93,6 @@ class Transform(private val municipalities: Map<String, Municipality>) {
null null
} }
location.addProperty(RDF.type, WD.location) location.addProperty(RDF.type, WD.location)
location.addProperty(WD.street, literal(street))
location.addProperty(WD.streetNumber, literal(streetNumber))
location.addProperty(WD.streetAddress, literal(combinedStreetAddress))
location.addProperty(WD.postalCode, literal(postalCode)) location.addProperty(WD.postalCode, literal(postalCode))
// does not enrich city, canton or cantons, if the postal code is not in the list. // does not enrich city, canton or cantons, if the postal code is not in the list.
if (municipality != null) { if (municipality != null) {
...@@ -116,7 +105,6 @@ class Transform(private val municipalities: Map<String, Municipality>) { ...@@ -116,7 +105,6 @@ class Transform(private val municipalities: Map<String, Municipality>) {
location.addProperty(WD.coordinates, literal(coordinate)) location.addProperty(WD.coordinates, literal(coordinate))
} }
} }
//val country = it["country_code"] as String
// country is currently hard coded to switzerland! // country is currently hard coded to switzerland!
location.addProperty(WD.country, WD.switzerland) location.addProperty(WD.country, WD.switzerland)
resource.addProperty(WD.streetAddress, location) resource.addProperty(WD.streetAddress, location)
......
package org.memobase.model
data class Address(
val address_line1: String,
val address_line2: String,
val postal_code: String
)
\ No newline at end of file
package org.memobase.model
data class Institution(
val field_memobase_id: String,
val title: String,
val field_text: RichText,
val field_isil: String,
val field_email: String,
val field_link_archive_catalog: Link,
val field_website: Link,
val field_address: List<Address>
)
\ No newline at end of file
package org.memobase.model
data class LanguageString(
val de: String,
val fr: String,
val it: String
) {
fun get(lang: String): String {
return when (lang) {
"de" -> de
"fr" -> fr
else -> it
}
}
}
\ No newline at end of file
package org.memobase.model
class LanguageVariants(
val de: Institution,
val fr: Institution,
val it: Institution
)
\ No newline at end of file
package org.memobase.model
data class Link(
val uri: String
)
\ No newline at end of file
package org.memobase.model
data class MergedAddress(
val addressLine1: LanguageString,
val addressLine2: LanguageString,
val postalCode: String
)
\ No newline at end of file
package org.memobase.model
data class MergedInstitution(
internal val id: String,
val name: LanguageString,
val description: LanguageString,
val isil: String,
val contactEmail: String,
val onlineCatalogueLink: String,
val website: String,
val addresses: List<MergedAddress>
)
\ No newline at end of file
package org.memobase package org.memobase.model
data class Municipality( data class Municipality(
val postalCodes: List<String>, val postalCodes: List<String>,
......
package org.memobase.model
data class RichText(
val value: String
)
\ No newline at end of file
...@@ -39,7 +39,7 @@ class Test { ...@@ -39,7 +39,7 @@ class Test {
@Test @Test
fun `test municipalities loader`() { fun `test municipalities loader`() {
val result = MunicipalitiesLoader.getMunicipalities() val result = Helpers.getMunicipalities()
assertThat(result) assertThat(result)
.isNotNull .isNotNull
.isNotEmpty .isNotEmpty
......
{ {
"de": { "de": {
"title": "Association Films Plans-Fixes",
"field_memobase_id": "test", "field_memobase_id": "test",
"field_isil": "12345" "field_isil": "12345",
"field_address": [
{
"langcode": "",
"country_code": "CH",
"administrative_area": null,
"locality": "Lausanne",
"dependent_locality": null,
"postal_code": "2502",
"sorting_code": null,
"address_line1": "Straßenname 12",
"address_line2": "Zusätzliche Adressinformationen",
"organization": "Association Films Plans-Fixes",
"given_name": null,
"additional_name": null,
"family_name": null
},
{
"langcode": "",
"country_code": "DE",
"administrative_area": null,
"locality": "Berlin",
"dependent_locality": null,
"postal_code": "2502",
"sorting_code": null,
"address_line1": " Ostseestraße 107",
"address_line2": "",
"organization": "",
"given_name": null,
"additional_name": null,
"family_name": null
}
],
"field_email": "info@plans-fixess.ch",
"field_link_archive_catalog": {
"uri": "http://www.plansfixes.ch/indexation/",
"title": "",
"options": [
]
},
"field_text": {
"value": "<p>Association Films Plans-Fixes</p>\r\n",
"format": "editorial",
"processed": "Association Films Plans-Fixes"
},
"field_website": {
"uri": "http://www.plansfixes.ch/",
"title": "",
"options": [
]
}
}, },
"fr": { "fr": {
"field_memobase_id": "test" "title": "Association Films Plans-Fixes",
"field_memobase_id": "test",
"field_address": [
{
"langcode": "",
"country_code": "CH",
"administrative_area": null,
"locality": "Lausanne",
"dependent_locality": null,
"postal_code": "2502",
"sorting_code": null,
"address_line1": "Straßenname 12",
"address_line2": "Zusätzliche Adressinformationen",
"organization": "Association Films Plans-Fixes",
"given_name": null,
"additional_name": null,
"family_name": null
},
{
"langcode": "",
"country_code": "DE",
"administrative_area": null,
"locality": "Berlin",
"dependent_locality": null,
"postal_code": "2502",
"sorting_code": null,
"address_line1": " Ostseestraße 107",
"address_line2": "",
"organization": "",
"given_name": null,
"additional_name": null,
"family_name": null
}
],
"field_email": "info@plans-fixess.ch",
"field_isil": "PlansFixes-All",
"field_link_archive_catalog": {
"uri": "http://www.plansfixes.ch/indexation/",
"title": "",
"options": [
]
},
"field_text": {
"value": "<p>Association Films Plans-Fixes</p>\r\n",
"format": "editorial",
"processed": "Association Films Plans-Fixes"
},
"field_website": {
"uri": "http://www.plansfixes.ch/",
"title": "",
"options": [
]
}
}, },
"it": { "it": {
"field_memobase_id": "test" "title": "Association Films Plans-Fixes",
"field_memobase_id": "test",
"field_address": [
{
"langcode": "",
"country_code": "CH",
"administrative_area": null,
"locality": "Lausanne",
"dependent_locality": null,
"postal_code": "2502",
"sorting_code": null,
"address_line1": "Straßenname 12",
"address_line2": "Zusätzliche Adressinformationen",
"organization": "Association Films Plans-Fixes",
"given_name": null,
"additional_name": null,
"family_name": null
},
{
"langcode": "",
"country_code": "DE",
"administrative_area": null,
"locality": "Berlin",
"dependent_locality": null,
"postal_code": "2502",
"sorting_code": null,
"address_line1": " Ostseestraße 107",
"address_line2": "",
"organization": "",
"given_name": null,
"additional_name": null,
"family_name": null
}
],
"field_email": "info@plans-fixess.ch",
"field_isil": "PlansFixes-All",
"field_link_archive_catalog": {
"uri": "http://www.plansfixes.ch/indexation/",
"title": "",
"options": [
]
},