Commit d3778ad9 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Improve implementation

parent 3d3c7a94
Pipeline #10742 failed with stages
in 2 minutes and 18 seconds
## Search Doc Service ## Drupal Sync Service
\ No newline at end of file
Transforms json objects into RDF Resources.
### Canton Query
https://query.wikidata.org/
```sparql
SELECT ?item ?labelDE ?labelFR ?labelIT
WHERE
{
?item wdt:P31 wd:Q23058.
?item rdfs:label ?labelDE .
FILTER(LANG(?labelDE) = "de")
?item rdfs:label ?labelFR .
FILTER(LANG(?labelFR) = "fr")
?item rdfs:label ?labelIT .
FILTER(LANG(?labelIT) = "fr")
}
```
### Municipalities Query
https://query.wikidata.org/
```sparql
SELECT ?item ?postalCode ?adminUnit ?labelDE ?labelFR ?labelIT (GROUP_CONCAT(DISTINCT ?coordinates; SEPARATOR=",") as ?groupedCoordinates)
WHERE
{
?item wdt:P31 wd:Q70208.
?item wdt:P281 ?postalCode .
?item wdt:P625 ?coordinates .
?item wdt:P131 ?adminUnit .
?adminUnit wdt:P31 wd:Q23058 .
?item rdfs:label ?labelDE .
FILTER(LANG(?labelDE) = "de")
?item rdfs:label ?labelFR .
FILTER(LANG(?labelFR) = "fr")
?item rdfs:label ?labelIT .
FILTER(LANG(?labelIT) = "fr")
}
GROUP BY ?item ?postalCode ?adminUnit ?labelDE ?labelFR ?labelIT
```
\ No newline at end of file
...@@ -18,25 +18,54 @@ ...@@ -18,25 +18,54 @@
package org.memobase package org.memobase
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException import com.beust.klaxon.KlaxonException
import java.io.StringReader
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import java.io.StringReader
import kotlin.system.exitProcess
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("StreamsProcessing") private val log = LogManager.getLogger("StreamsProcessing")
private val municipalities = getMunicipalities()
private fun getMunicipalities(): Map<String, Municipality> {
val stream = ClassLoader.getSystemResourceAsStream("municipalities.tsv")
if (stream != null) {
return stream.bufferedReader().lineSequence().map {
val values = it.split("\t")
Municipality(
values[1].trim('"').split(", "),
values[2].trim('<', '>'),
values[0].trim('<', '>'),
values[3].replace("@de", "").trim('"'),
values[4].replace("@fr", "").trim('"'),
values[5].replace("@it", "").trim('"'),
values[6].split(",")
)
}.map { municipality ->
municipality.postalCodes.map { code ->
Pair(code, municipality)
}
}.flatten().toMap()
} else {
log.error("Could not load municipalities.tsv from classpath!")
exitProcess(1)
}
}
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
stream stream
.flatMapValues { value -> parseJson(value) } .flatMapValues { value -> parseJson(value) }
.mapValues { value -> transformJson(value) } .mapValues { value -> transformJson(value) }
.map { _, value -> value.write() }
.to(settings.outputTopic) .to(settings.outputTopic)
return builder.build() return builder.build()
...@@ -52,9 +81,6 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -52,9 +81,6 @@ class KafkaTopology(private val settings: SettingsLoader) {
} }
private fun transformJson(input: JsonObject): Transform { private fun transformJson(input: JsonObject): Transform {
return Transform(municipalities).createInstitution(input)
} }
} }
package org.memobase
data class Municipality(
val postalCodes: List<String>,
val canton: String,
val id: String,
val de: String,
val fr: String,
val it: String,
val coordinates: List<String>
)
\ No newline at end of file
...@@ -27,7 +27,6 @@ class Service(file: String = "app.yml") { ...@@ -27,7 +27,6 @@ class Service(file: String = "app.yml") {
val settings = SettingsLoader( val settings = SettingsLoader(
listOf( listOf(
"mapping"
), ),
file, file,
useStreamsConfig = true useStreamsConfig = true
......
...@@ -6,10 +6,12 @@ import org.apache.jena.rdf.model.ModelFactory ...@@ -6,10 +6,12 @@ import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat import org.apache.jena.riot.RDFFormat
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.* import org.memobase.rdf.*
import java.io.StringWriter import java.io.StringWriter
class Transform { class Transform(private val municipalities: Map<String, Municipality>) {
private val log = LogManager.getLogger("Transform")
private val model = ModelFactory.createDefaultModel() private val model = ModelFactory.createDefaultModel()
private var uri: String = "" private var uri: String = ""
...@@ -28,10 +30,10 @@ class Transform { ...@@ -28,10 +30,10 @@ class Transform {
resource.addProperty(RICO.name, langLiteral(source["field_name"] as String, "de")) resource.addProperty(RICO.name, langLiteral(source["field_name"] as String, "de"))
resource.addProperty(RICO.descriptiveNote, langLiteral(source["field_text"] as String, "de")) resource.addProperty(RICO.descriptiveNote, langLiteral(source["field_text"] as String, "de"))
(source["field_addresses"] as List<JsonObject>).forEach { (source["field_addresses"] as List<JsonObject>).forEach { fieldAddress ->
val postalAddress = model.createResource() val location = model.createResource()
val streetAddress = it["address_line1"] as String val streetAddress = fieldAddress["address_line1"] as String
val secondAddressLine = it["address_line2"] as String? val secondAddressLine = fieldAddress["address_line2"] as String?
val combinedStreetAddress = if (secondAddressLine != null) { val combinedStreetAddress = if (secondAddressLine != null) {
streetAddress + "\n" + secondAddressLine streetAddress + "\n" + secondAddressLine
} else { } else {
...@@ -39,14 +41,36 @@ class Transform { ...@@ -39,14 +41,36 @@ class Transform {
} }
val streetNumber = streetAddress.substringAfterLast(" ") val streetNumber = streetAddress.substringAfterLast(" ")
val street = streetAddress.replace(streetNumber, "").trim() val street = streetAddress.replace(streetNumber, "").trim()
postalAddress.addProperty(RDF.type, WD.postalAddress)
postalAddress.addProperty(WD.street, literal(street)) val postalCode = (fieldAddress["postal_code"] as String).trim()
postalAddress.addProperty(WD.streetNumber, literal(streetNumber)) val municipality = if (municipalities.containsKey(postalCode)) {
postalAddress.addProperty(WD.streetAddress, literal(combinedStreetAddress)) municipalities[postalCode]
postalAddress.addProperty(WD.postalCode, literal(it["postal_code"] as String)) } else {
postalAddress.addProperty(WD.adminUnit, literal(it["locality"] as String)) // TODO: This information needs to reach the user!
postalAddress.addProperty(WD.adminUnit, literal(it["country"] as String)) log.error("Invalid postal code: $postalCode")
resource.addProperty(WD.streetAddress, postalAddress) null
}
location.addProperty(RDF.type, WD.location)
location.addProperty(WD.street, literal(street))
location.addProperty(WD.streetNumber, literal(streetNumber))
location.addProperty(WD.streetAddress, literal(combinedStreetAddress))
location.addProperty(WD.postalCode, literal(postalCode))
// does not enrich city, canton or cantons, if the postal code is not in the list.
if (municipality != null) {
// canton
location.addProperty(WD.adminUnit, model.createResource(municipality.canton))
// city
location.addProperty(WD.adminUnit, model.createResource(municipality.id))
// coordinates
municipality.coordinates.forEach { coordinate ->
location.addProperty(WD.coordinates, literal(coordinate))
}
}
//val country = it["country_code"] as String
// country is currently hard coded to switzerland!
location.addProperty(WD.country, WD.switzerland)
resource.addProperty(WD.streetAddress, location)
} }
resource.addProperty(WD.website, literal(source["field_website"] as String)) resource.addProperty(WD.website, literal(source["field_website"] as String))
...@@ -65,8 +89,6 @@ class Transform { ...@@ -65,8 +89,6 @@ class Transform {
RDFDataMgr.write(writer, model, RDFFormat.NTRIPLES_UTF8) RDFDataMgr.write(writer, model, RDFFormat.NTRIPLES_UTF8)
return@use KeyValue(uri, writer.toString().trim()) return@use KeyValue(uri, writer.toString().trim())
} }
} }
......
...@@ -6,7 +6,8 @@ import org.apache.jena.rdf.model.ResourceFactory ...@@ -6,7 +6,8 @@ import org.apache.jena.rdf.model.ResourceFactory
object WD { object WD {
val postalAddress = res("Q319608") val location = res("Q17334923")
val switzerland = res("Q39")
val isil = prop("P791") val isil = prop("P791")
...@@ -28,7 +29,7 @@ object WD { ...@@ -28,7 +29,7 @@ object WD {
return ResourceFactory.createProperty(NS.wdt, name) return ResourceFactory.createProperty(NS.wdt, name)
} }
private fun res(name: String): Resource { fun res(name: String): Resource {
return ResourceFactory.createResource(NS.wd + name) return ResourceFactory.createResource(NS.wd + name)
} }
......
app:
mapping: /configs/mappings/mapping.yml
kafka: kafka:
streams: streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system} bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
item,labelDE,labelFR,labelIT
http://www.wikidata.org/entity/Q834,Kanton Wallis,canton du Valais,canton du Valais
http://www.wikidata.org/entity/Q11922,Kanton Glarus,canton de Glaris,canton de Glaris
http://www.wikidata.org/entity/Q11911,Kanton Bern,canton de Berne,canton de Berne
http://www.wikidata.org/entity/Q11929,Kanton Solothurn,canton de Soleure,canton de Soleure
http://www.wikidata.org/entity/Q11917,Kanton Genf,canton de Genève,canton de Genève
http://www.wikidata.org/entity/Q11925,Kanton Graubünden,canton des Grisons,canton des Grisons
http://www.wikidata.org/entity/Q11933,Kanton Zug,canton de Zoug,canton de Zoug
http://www.wikidata.org/entity/Q11943,Kanton Zürich,canton de Zurich,canton de Zurich
http://www.wikidata.org/entity/Q11972,Kanton Aargau,canton d'Argovie,canton d'Argovie
http://www.wikidata.org/entity/Q12079,Kanton Appenzell Ausserrhoden,Appenzell Rhodes-Extérieures,Appenzell Rhodes-Extérieures
http://www.wikidata.org/entity/Q12121,Kanton Luzern,canton de Lucerne,canton de Lucerne
http://www.wikidata.org/entity/Q12146,Kanton Basel-Landschaft,canton de Bâle-Campagne,canton de Bâle-Campagne
http://www.wikidata.org/entity/Q12094,Kanton Appenzell Innerrhoden,canton de Appenzell Rhodes-Intérieures,canton de Appenzell Rhodes-Intérieures
http://www.wikidata.org/entity/Q12172,Kanton Basel-Stadt,canton de Bâle-Ville,canton de Bâle-Ville
http://www.wikidata.org/entity/Q12404,Kanton Uri,canton d'Uri,canton d'Uri
http://www.wikidata.org/entity/Q12592,Kanton Nidwalden,canton de Nidwald,canton de Nidwald
http://www.wikidata.org/entity/Q12713,Kanton Thurgau,canton de Thurgovie,canton de Thurgovie
http://www.wikidata.org/entity/Q12724,Kanton Tessin,canton du Tessin,canton du Tessin
http://www.wikidata.org/entity/Q12640,Kanton Freiburg,canton de Fribourg,canton de Fribourg
http://www.wikidata.org/entity/Q12573,Kanton Obwalden,canton d'Obwald,canton d'Obwald
http://www.wikidata.org/entity/Q12433,Kanton Schwyz,canton de Schwytz,canton de Schwytz
http://www.wikidata.org/entity/Q12746,Kanton St. Gallen,canton de Saint-Gall,canton de Saint-Gall
http://www.wikidata.org/entity/Q12697,Kanton Schaffhausen,canton de Schaffhouse,canton de Schaffhouse
http://www.wikidata.org/entity/Q12738,Kanton Neuenburg,canton de Neuchâtel,canton de Neuchâtel
http://www.wikidata.org/entity/Q12755,Kanton Jura,canton du Jura,canton du Jura
http://www.wikidata.org/entity/Q12771,Kanton Waadt,canton de Vaud,canton de Vaud
literals:
- source: "dct:abstract"
target: "abstract"
hasLanguageTags: true
- source: "dct:relation"
target: "relation"
hasLanguageTags: true
- source: "rico:descriptiveNote"
target: "descriptiveNote"
hasLanguageTags: true
- source: "rico:scopeAndContent"
target: "scopeAndContent"
hasLanguageTags: true
- source: "rico:source"
target: "source"
hasLanguageTags: true
- source: "rico:title"
target: "title"
hasLanguageTags: true
- source: "schema:sameAs"
target: "sameAs"
hasLanguageTags: false
- source: "rico:type"
target: "type.keyword"
hasLanguageTags: false
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -42,142 +42,15 @@ class Test { ...@@ -42,142 +42,15 @@ class Test {
} }
@Test @Test
fun `test mapping config`() { fun `test institution transform`() {
val mapping = YamlLoader("src/test/resources/testMapping.yml").load() val service = Service("test1.yml")
assertThat(Klaxon().toJsonString(mapping))
.isEqualTo(
"{\"entities\" : [{\"conditionalTargets\" : [], \"isCreationRelation\" : false, \"isDate\" : true, \"property\" : \"dct:temporal\", \"source\" : \"\", \"target\" : \"temporal\"}, {\"conditionalTargets\" : [], \"isCreationRelation\" : true, \"isDate\" : false, \"property\" : \"rico:recordResourceOrInstantiationIsSourceOfCreationRelation\", \"source\" : \"rico:name\", \"target\" : \"\"}, {\"conditionalTargets\" : [], \"isCreationRelation\" : false, \"isDate\" : false, \"property\" : \"rdau:P60556\", \"source\" : \"rico:name\", \"target\" : \"placeCaptureRaw\"}, {\"conditionalTargets\" : [{\"conditionPattern\" : \"holder\", \"conditionProperty\" : \"rico:type\", \"target\" : \"rightsHolder\"}], \"isCreationRelation\" : false, \"isDate\" : false, \"property\" : \"rico:regulatedBy\", \"source\" : \"rico:name\", \"target\" : \"\"}], \"literals\" : [{\"hasLanguageTags\" : true, \"source\" : \"dct:abstract\", \"target\" : \"abstract\"}, {\"hasLanguageTags\" : false, \"source\" : \"@id\", \"target\" : \"id\"}, {\"hasLanguageTags\" : false, \"source\" : \"schema:sameAs\", \"target\" : \"sameAs\"}]}"
)
}
@ParameterizedTest
@MethodSource("testDates")
fun `test date facet`(date: TestDate) {
val result = when (date.type) {
"single" ->
DateFacetBuilder.buildFromNormalizedSingleDate(date.date)
"range" ->
DateFacetBuilder.buildFromNormalizedDateRange(date.date)
else ->
emptyList()
}
assertThat(result)
.isEqualTo(date.result)
}
private fun testDates() = Stream.of(
TestDate(
"1921-09-14",
"single",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#")
),
TestDate(
"1921",
"range",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#")
),
TestDate(
"1921/1922",
"range",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#")
),
TestDate(
"1921-05-01/02",
"range",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#")
),
TestDate(
"1921-05-01/06-02",
"range",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#")
),
TestDate(
"1921/1931",
"range",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#", "1~20.Jahrhundert~1931-1940#")
),
TestDate(
"1921-04-01/1931-05-02",
"range",
listOf("0~20.Jahrhundert~", "1~20.Jahrhundert~1921-1930#", "1~20.Jahrhundert~1931-1940#")
),
TestDate(
"1921/1951",
"range",
listOf(
"0~20.Jahrhundert~",
"1~20.Jahrhundert~1921-1930#",
"1~20.Jahrhundert~1931-1940#",
"1~20.Jahrhundert~1941-1950#",
"1~20.Jahrhundert~1951-1960#"
)
),
TestDate(
"1721/1951",
"range",
listOf(
"0~18.Jahrhundert~",
"0~19.Jahrhundert~",
"0~20.Jahrhundert~",
"1~18.Jahrhundert~1721-1730#",
"1~18.Jahrhundert~1731-1740#",
"1~18.Jahrhundert~1741-1750#",
"1~18.Jahrhundert~1751-1760#",
"1~18.Jahrhundert~1761-1770#",
"1~18.Jahrhundert~1771-1780#",
"1~18.Jahrhundert~1781-1790#",
"1~18.Jahrhundert~1791-1800#",
"1~19.Jahrhundert~1801-1810#",
"1~19.Jahrhundert~1811-1820#",
"1~19.Jahrhundert~1821-1830#",
"1~19.Jahrhundert~1831-1840#",
"1~19.Jahrhundert~1841-1850#",
"1~19.Jahrhundert~1851-1860#",
"1~19.Jahrhundert~1861-1870#",
"1~19.Jahrhundert~1871-1880#",
"1~19.Jahrhundert~1881-1890#",
"1~19.Jahrhundert~1891-1900#",
"1~20.Jahrhundert~1901-1910#",
"1~20.Jahrhundert~1911-1920#",
"1~20.Jahrhundert~1921-1930#",
"1~20.Jahrhundert~1931-1940#",
"1~20.Jahrhundert~1941-1950#",
"1~20.Jahrhundert~1951-1960#"
)
),
TestDate(
"1721/1822",
"range",
listOf(
"0~18.Jahrhundert~",
"0~19.Jahrhundert~",
"1~18.Jahrhundert~1721-1730#",
"1~18.Jahrhundert~1731-1740#",
"1~18.Jahrhundert~1741-1750#",
"1~18.Jahrhundert~1751-1760#",
"1~18.Jahrhundert~1761-1770#",
"1~18.Jahrhundert~1771-1780#",
"1~18.Jahrhundert~1781-1790#",
"1~18.Jahrhundert~1791-1800#",
"1~19.Jahrhundert~1801-1810#",
"1~19.Jahrhundert~1811-1820#",
"1~19.Jahrhundert~1821-1830#"
)
)
)
@ParameterizedTest
@MethodSource("testParams")
fun `integration tests`(params: TestParam) {
val service = Service(params.settingsFileName)
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings) val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory( val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer() StringSerializer(), StringSerializer()
) )
testDriver.pipeInput( testDriver.pipeInput(
factory.create( factory.create(
service.settings.inputTopic, params.inputKey, readFile("${params.count}/input.json") service.settings.inputTopic, "", readFile("1/input.json")
) )
) )
...@@ -188,21 +61,9 @@ class Test { ...@@ -188,21 +61,9 @@ class Test {
) )
assertThat(record.value()) assertThat(record.value())
.isEqualTo(readFile("${params.count}/output.json")) .isEqualTo(readFile("1/output.nt"))
}
private fun testParams() = Stream.of( assertThat(record.key())
TestParam( .isEqualTo("")
"test-1", }
1,
"kafkaTest1.yml",
"key"
),
TestParam(
"test-2",
2,
"kafkaTest1.yml",
"key"
)
)
} }
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class TestDate(
val date: String,
val type: String,
val result: List<String>
)
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class TestParam(
val name: String,
val count: Int,
val settingsFileName: String,
val inputKey: String
)
app:
mapping: src/test/resources/mapping.yml
kafka:
streams: