Commit 329971b5 authored by Jonas Waeber's avatar Jonas Waeber

Merge branch 'institution-transform'

parents 5ec21ff8 9146aa8b
Pipeline #16811 passed with stages
in 6 minutes and 48 seconds
......@@ -50,6 +50,8 @@ dependencies {
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
......
......@@ -25,4 +25,11 @@ spec:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.deploymentName}}-app-config"
volumeMounts:
- name: instituion-type-labels
mountPath: "/configs/institution_types/"
volumes:
- name: instituion-type-labels
configMap:
name: "{{ .Values.instutionTypeLabels }}"
restartPolicy: Always
......@@ -9,4 +9,6 @@ outputTopic: search-doc-output-documents
inputTopic: search-doc-input-documents
reportingTopic: postprocessing-reporting
instutionTypeLabels: institution-type-labels
mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/
\ No newline at end of file
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.*
import org.memobase.model.*
class InstitutionSearchDocBuilder(path: String) {
private val log = LogManager.getLogger("InstitutionSearchDocBuilder")
private val institutionTypeMapper = InstitutionTypeMapper(path)
fun transform(key: String, input: Map<String, JsonObject>): Schema {
val institution = input["institution"] ?: throw InvalidInputException("No institution entity found in message $key.")
val identifiers = Filter.entitiesByProperty(KEYS.identifiedBy, institution, input)
val locations = Filter.entitiesByProperty(KEYS.hasLocation, institution, input)
val cantons = mutableListOf<JsonObject>()
input.values.forEach {
if (it[KEYS.ricoType] == KEYS.LocationType.canton) {
cantons.add(it)
}
}
val type = institution[KEYS.wikidataInstance].let {
when(it) {
is String -> listOf(institutionTypeMapper.getValue(it))
is JsonArray<*> -> it.map { any -> institutionTypeMapper.getValue(any as String) }
else -> {
log.error("Found no institution types on institution $key")
emptyList()
}
}
}
val name = Extract.languageContainer("institution", institution[KEYS.name]).let { names ->
when {
names.isEmpty() -> {
LanguageContainer.placeholder("NoNameFound")
}
names.size == 1 -> {
names[0]
}
else -> {
names.reduce { acc, languageContainer -> acc.merge(languageContainer) }
}
}
}
val canton = cantons.map {
it[KEYS.name].let { name ->
Extract.languageContainer("canton", name).reduce { acc, languageContainer -> acc.merge(languageContainer) }
}
}.let { c ->
when {
c.isEmpty() -> {
listOf(LanguageContainer.placeholder("NoCantonNameFound"))
}
else -> c
}
}
return InstitutionSearchDoc(
institutionId = Extract.extractIdValue(identifiers, KEYS.IdentifierType.main) ?: "NoIdentifierFound",
published = institution[KEYS.isPublished].let {
when (it) {
is String -> it.toBoolean()
else -> {
log.error("Found no isPublished property on institution $key. Set to false.")
false
}
}
},
type = type,
name = name,
documentType = listOf(LanguageContainer.placeholder("PLACEHOLDER")),
keyVisualLink = "placeholderlink",
canton = canton,
numberOfRecordSets = 0,
numberOfDocuments = 0
)
}
}
......@@ -18,17 +18,20 @@
package org.memobase
import com.beust.klaxon.JsonObject
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringWriter
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Default
import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS
import org.memobase.model.Report
import org.memobase.model.SearchDoc
import org.memobase.model.Schema
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) {
......@@ -36,39 +39,62 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val reportTopic = settings.processReportTopic
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.mediaUrlPropName))
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.SettingsProps.mediaUrl))
private val institutionSearchDoc = InstitutionSearchDocBuilder(settings.appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath))
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
val transformedStream = stream
val branchedStream = stream
.flatMapValues { value -> JSON.parse(value) }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.record) },
Predicate { _, value -> value.containsKey(JSON.institution) },
Predicate { _, value -> value.containsKey(JSON.recordSet) },
Predicate { _, _ -> true }
)
val recordStream = branchedStream[0]
.mapValues { readOnlyKey, value ->
try {
Pair(searchDocTransform.transform(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
} catch (ex: InvalidInputException) {
Pair(Default.searchDoc, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
}
outputStreams(recordStream)
val institutionStream = branchedStream[1]
.mapValues { readOnlyKey, value ->
try {
Pair(transformJson(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
Pair(institutionSearchDoc.transform(readOnlyKey, value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
} catch (ex: InvalidInputException) {
Pair(null, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
Pair(Default.institutionSearchDoc, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
}
transformedStream
.map { _, value -> KeyValue(value.second.id, value.second.toJson()) }
outputStreams(institutionStream)
return builder.build()
}
private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
stream
.mapValues { value -> value.second.toJson() }
.to(reportTopic)
transformedStream
stream
.filterNot { _, value -> value.second.status == "FAILURE" }
.map { _, value -> KeyValue(value.first?.id, value.first) }
.mapValues { value -> value.first }
.mapValues { value ->
val writer = StringWriter()
ObjectMapper().registerKotlinModule().writeValue(writer, value)
writer.toString()
}
.to(settings.outputTopic)
return builder.build()
}
private fun transformJson(input: Map<String, JsonObject>): SearchDoc {
return searchDocTransform.transform(input)
}
}
......@@ -23,12 +23,13 @@ import org.apache.logging.log4j.LogManager
import org.memobase.builders.*
import org.memobase.helpers.*
import org.memobase.model.EnrichedDigitalMetadata
import org.memobase.model.Schema
import org.memobase.model.SearchDoc
import org.memobase.rdf.NS
class SearchDocTransform(private val mediaUrl: String) {
private val log = LogManager.getLogger("SearchDocTransform")
fun transform(input: Map<String, JsonObject>): SearchDoc {
fun transform(input: Map<String, JsonObject>): Schema {
val record = input["record"] ?: throw InvalidInputException("No record defined in the message.")
val digitalObject =
......@@ -116,8 +117,6 @@ class SearchDocTransform(private val mediaUrl: String) {
}
}
val recordIdentifiers = Filter.entitiesByProperty("identifiedBy", record, input)
val recordTitles = Filter.entitiesByProperty("hasTitle", record, input)
val recordLanguages = Filter.entitiesByProperty("hasLanguage", record, input)
......@@ -186,7 +185,7 @@ class SearchDocTransform(private val mediaUrl: String) {
},
sameAs = Extract.listOfStrings(record["sameAs"]),
abstract = Extract.languageContainer("abstract", record["abstract"]),
id = id,
recordId = id,
institution = Meta.extractInstitution(record),
recordSet = Meta.extractRecordSet(record),
descriptiveNote = Extract.languageContainer("descriptiveNote", record["descriptiveNote"]),
......
......@@ -21,6 +21,7 @@ package org.memobase
import kotlin.system.exitProcess
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.KEYS
import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
......@@ -28,7 +29,8 @@ class Service(file: String = "app.yml") {
val settings = SettingsLoader(
listOf(
KEYS.mediaUrlPropName
KEYS.SettingsProps.institutionTypeLabelsPath,
KEYS.SettingsProps.mediaUrl
),
file,
useStreamsConfig = true
......
......@@ -20,7 +20,7 @@ package org.memobase.builders
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.KEYS
import org.memobase.helpers.KEYS
import org.memobase.helpers.Extract
import org.memobase.helpers.FacetBuildHelpers
import org.memobase.model.AgentWithRelationContainer
......
......@@ -3,7 +3,7 @@ package org.memobase.builders
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.KEYS
import org.memobase.helpers.KEYS
import org.memobase.helpers.DateFacetBuildHelpers
import org.memobase.model.DateContainer
import org.memobase.rdf.NS
......
......@@ -19,7 +19,7 @@
package org.memobase.builders
import com.beust.klaxon.JsonObject
import org.memobase.KEYS
import org.memobase.helpers.KEYS
import org.memobase.helpers.Extract
import org.memobase.model.FacettedContainer
import org.memobase.rdf.NS
......
......@@ -20,7 +20,7 @@ package org.memobase.builders
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.KEYS
import org.memobase.helpers.KEYS
import org.memobase.helpers.FacetBuildHelpers
import org.memobase.rdf.NS
......
......@@ -20,7 +20,7 @@ package org.memobase.builders
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.KEYS
import org.memobase.helpers.KEYS
import org.memobase.helpers.FacetBuildHelpers
import org.memobase.rdf.NS
......
......@@ -19,7 +19,7 @@
package org.memobase.builders
import com.beust.klaxon.JsonObject
import org.memobase.KEYS
import org.memobase.helpers.KEYS
import org.memobase.helpers.Extract
import org.memobase.model.SuggestContainer
import org.memobase.rdf.NS
......
package org.memobase.helpers
import org.memobase.model.*
object Default {
val institutionSearchDoc = InstitutionSearchDoc(
"UnknownId",
false,
LanguageContainer.EMPTY,
listOf(LanguageContainer.EMPTY),
emptyList(),
emptyList(),
"",
0,
0
)
val searchDoc = SearchDoc(
emptyList(),
emptyList(),
emptyList(),
"",
"",
"",
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
FacettedContainer(LanguageContainer(emptyList(), emptyList(), emptyList(), emptyList()),"", emptyList()),
false,
emptyList(),
emptyList(),
"",
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
EnrichedDigitalMetadata(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
false,
SuggestContainer(emptyList(), emptyList(), emptyList(), emptyList())
)
}
\ No newline at end of file
......@@ -20,7 +20,6 @@ package org.memobase.helpers
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.KEYS
/**
* Helper functions to build hierarchical facet values for places and persons.
......
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.helpers
import org.memobase.model.LanguageContainer
class InstitutionTypeMapper(path: String) {
private val labels = LoadFile.readLabelFile(path)
fun getValue(uri: String): LanguageContainer {
val q = uri.substringAfterLast("/")
return labels[q] ?: LanguageContainer.DEFAULT
}
}
......@@ -8,20 +8,36 @@ import java.io.StringReader
object JSON {
private const val graph = "@graph"
private const val atId = "@id"
const val atType = "@type" // rdf:type
const val type = "type" // rico:type
private const val Record = "Record"
private const val RecordSet = "RecordSet"
private const val CorporateBody = "CorporateBody"
private const val memobaseInstitution = "memobaseInstitution"
const val institution = "institution"
const val record = "record"
const val recordSet = "recordSet"
private val klaxon = Klaxon()
fun parse(data: String): List<JsonObject> {
val result = Klaxon().parseJsonObject(StringReader(data))
val result = klaxon.parseJsonObject(StringReader(data))
return listOf(result)
}
fun unpack(input: JsonObject): Map<String, JsonObject> {
val graph = input["@graph"] as JsonArray<JsonObject>
val graph = input[graph] as JsonArray<JsonObject>
return graph.map {
if (it["@type"] == NS.rico + "Record") {
Pair("record", it)
if (it[atType] == NS.rico + Record) {
Pair(record, it)
} else if (it[atType] == NS.rico + RecordSet) {
Pair(record, it)
} else if (it[atType] == NS.rico + CorporateBody && it[type] == memobaseInstitution) {
Pair(institution, it)
} else {
Pair(it["@id"] as String, it)
Pair(it[atId] as String, it)
}
}.toMap()
}
......
......@@ -16,10 +16,14 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.helpers
object KEYS {
const val mediaUrlPropName = "media.url"
object SettingsProps {
const val mediaUrl = "media.url"
const val institutionTypeLabelsPath = "institutionTypeLabelsPath"
}
const val entityId = "@id"
const val atType = "@type"
......@@ -34,6 +38,7 @@ object KEYS {
const val agentIsTargetOfCreationRelation = "agentIsTargetOfCreationRelation"
const val hasSubject = "hasSubject"
const val hasLocation = "hasLocation"
const val placeOfCapture = "P60556"
const val spatial = "spatial"
const val producer = "P60441"
......@@ -45,6 +50,8 @@ object KEYS {
const val contributor = "contributor"
const val creator = "creator"
const val identifiedBy = "identifiedBy"
const val Person = "Person"
const val CorporateBody = "CorporateBody"
const val Agent = "Agent"
......@@ -64,6 +71,14 @@ object KEYS {
const val Concept = "Concept"
const val wikidataInstance = "P31"
const val missingLabelDe = "FEHLENDES LABEL"
const val missingLabelFr = "L'ÉTIQUETTE MANQUANTE"
const val missingLabelIt = "GALATEO MANCANTE"
const val missingLabelEn = "MISSING LABEL"
object TitleTypes {
const val main = "main"
const val series = "series"
......@@ -75,4 +90,11 @@ object KEYS {
const val original = "original"
const val callNumber = "callNumber"
}
object LocationType {
const val canton = "canton"
const val municipality = "municipality"
const val memobaseInstitution = "memobaseInstitution"
const val memobaseProject = "memobaseProject"
}
}
package org.memobase.helpers
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import org.memobase.model.LanguageContainer
import java.io.File
object LoadFile {
private val csv = csvReader()
fun readLabelFile(path: String): Map<String, LanguageContainer> {
val labelList = csv.readAll(File(path))
val labelsMap = mutableMapOf<String, LanguageContainer>()
for (row in labelList.listIterator(1)) {
labelsMap[row[0]] = LanguageContainer(listOf(row[1]), listOf(row[2]), listOf(row[3]), emptyList())
}
return labelsMap
}
}
\ No newline at end of file
......@@ -2,7 +2,6 @@ package org.memobase.helpers
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.KEYS
import org.memobase.model.FacettedContainer
import org.memobase.model.LanguageContainer
import org.memobase.rdf.NS
......
package org.memobase.model
import com.fasterxml.jackson.annotation.JsonInclude
@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class InstitutionSearchDoc(
val institutionId: String,
val published: Boolean,
// Full Text Search
val name: LanguageContainer,
// Facettes