Commit 331f0613 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor implementation

parent 2c9a7b09
Pipeline #10852 passed with stages
in 10 minutes and 8 seconds
...@@ -51,6 +51,8 @@ dependencies { ...@@ -51,6 +51,8 @@ dependencies {
// YAML Parser // YAML Parser
implementation 'org.snakeyaml:snakeyaml-engine:2.1' implementation 'org.snakeyaml:snakeyaml-engine:2.1'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.+' implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.+'
implementation 'com.fasterxml.jackson.core:jackson-core:2.11.+'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.+'
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.11.+" implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.11.+"
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.11.+' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.11.+'
......
...@@ -21,16 +21,20 @@ package org.memobase ...@@ -21,16 +21,20 @@ package org.memobase
import com.beust.klaxon.JsonArray import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringReader import java.io.StringReader
import java.io.StringWriter
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.model.SearchDoc
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("StreamsProcessing") private val log = LogManager.getLogger("StreamsProcessing")
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty("mapping")) private val searchDocTransform = SearchDocTransform()
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
...@@ -38,7 +42,11 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -38,7 +42,11 @@ class KafkaTopology(private val settings: SettingsLoader) {
.flatMapValues { value -> parseJson(value) } .flatMapValues { value -> parseJson(value) }
.mapValues { value -> unpackJson(value) } .mapValues { value -> unpackJson(value) }
.mapValues { value -> transformJson(value) } .mapValues { value -> transformJson(value) }
.mapValues { value -> value.toJsonString() } .mapValues { value ->
val writer = StringWriter()
ObjectMapper().registerKotlinModule().writeValue(writer, value)
writer.toString()
}
.to(settings.outputTopic) .to(settings.outputTopic)
return builder.build() return builder.build()
...@@ -61,7 +69,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -61,7 +69,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
}.toMap() }.toMap()
} }
private fun transformJson(input: Map<String, JsonObject>): JsonObject { private fun transformJson(input: Map<String, JsonObject>): SearchDoc {
return searchDocTransform.transform(input) return searchDocTransform.transform(input)
} }
} }
...@@ -18,192 +18,127 @@ ...@@ -18,192 +18,127 @@
package org.memobase package org.memobase
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject import com.beust.klaxon.JsonObject
import com.beust.klaxon.json
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Extract
import org.memobase.model.LanguageContainer
import org.memobase.model.NameContainer
import org.memobase.model.SearchDoc
class SearchDocTransform(mapping: String) { class SearchDocTransform {
private val log = LogManager.getLogger("SearchDocTransform") private val log = LogManager.getLogger("SearchDocTransform")
fun transform(input: Map<String, JsonObject>): SearchDoc {
val record = input["record"] ?: error("No record defined in this message.")
val id = record["@id"] as String
private val transformConfig = YamlLoader(mapping).load() val identifiers = input.values.filter { item -> item["@type"] == "rico:Identifier" }
val titles = input.values.filter { item -> item["@type"] == "rico:Title" }
val languages = input.values.filter { item -> item["@type"] == "rico:Language" }
val rules = input.values.filter { item -> item["@type"] == "rico:Rule" }
fun transform(input: Map<String, JsonObject>): JsonObject { val datesCreated = Extract.getEntitiesFromIds("dct:created", record, input)
return json { val datesIssued = Extract.getEntitiesFromIds("dct:issued", record, input)
obj(input["record"].let { record -> val temporal = Extract.getEntitiesFromIds("dct:temporal", record, input)
record?.mapNotNull { entry -> val placesRelated = Extract.getEntitiesFromIds("dct:spatial", record, input)
when (val transform = transformConfig.getTransform(entry.key)) { val placeCaptured = Extract.getEntitiesFromIds("rdau:P60556", record, input)
is LiteralTransform ->
listOf(extractPair(entry.value, transform.target))
is EntityTransform ->
when (val value = entry.value) {
is String -> {
input[value]?.let { entity ->
entityTransformSelection(input, entity, transform)
}
}
is JsonObject -> {
input[value["@id"] as String]?.let { entity ->
entityTransformSelection(input, entity, transform)
}
}
is JsonArray<*> -> {
value.flatMap { children ->
children as JsonObject
input[children["@id"] as String]?.let { entity ->
entityTransformSelection(input, entity, transform)
}.orEmpty()
}
}
else -> emptyList()
}
null -> emptyList()
}
}?.flatten().orEmpty().filterNotNull().asIterable()
})
}
}
private fun entityTransformSelection( val genre = Extract.getEntitiesFromIds("ebucore:hasGenre", record, input)
input: Map<String, JsonObject>, val subjects = Extract.getEntitiesFromIds("rico:hasSubject", record, input)
entity: JsonObject,
transform: EntityTransform
): List<Pair<String, Any>> {
return when {
transform.isDate -> {
extractDateEntity(entity, transform)
}
transform.conditionalTargets.isNotEmpty() -> {
transform.conditionalTargets.map {
if (entity.containsKey(it.conditionProperty) && entity[it.conditionProperty] == it.conditionPattern) {
if (transform.source.contains('|')) {
val sourceFields = transform.source.split('|')
sourceFields.mapNotNull { sourceField ->
extractPair(entity[sourceField], it.target)
}
} else {
val subPair = extractPair(entity[transform.source], it.target)
if (subPair == null)
emptyList()
else
listOf(subPair)
}
} else {
emptyList()
}
}.flatten()
}
transform.isCreationRelation -> {
extractCreationRelationAndAgents(input, entity)
}
else -> {
val pair = extractPair(entity[transform.source], transform.target)
if (pair == null) emptyList()
else listOf(pair)
}
}
}
private fun extractPair(value: Any?, targetField: String): Pair<String, Any>? {
return when (value) {
is String ->
Pair(targetField, value)
is JsonObject ->
Pair(targetField, mapOf(Pair(value["@language"], value["@value"])))
is JsonArray<*> -> {
val results = value.mapNotNull {
when (it) {
is String -> it
is JsonObject -> Pair(it["@language"], it["@value"])
else -> null
}
}
when {
results.isEmpty() -> {
null
}
results[0] is String -> {
Pair(targetField, results)
}
else -> {
Pair(targetField, (results as List<Pair<Any, Any>>).toMap())
}
}
}
else -> {
log.error("Could not parse literal value: $value.")
null
}
}
}
private fun extractDateEntity(entity: JsonObject, transform: Transform): List<Pair<String, Any>> { val publishers = Extract.getEntitiesFromIds("rico:publishedBy", record, input)
val isNormalized = entity.containsKey("rico:normalizedDateValue") val producers = Extract.getEntitiesFromIds("rdau:P60441", record, input)
val creationRelationAgents =
Extract.getEntitiesFromIds("rico:recordResourceOrInstantiationIsSourceOfCreationRelation", record, input)
val date = if (isNormalized) { return SearchDoc(
entity["rico:normalizedDateValue"] as String title = Extract.typedEntityByType(titles, "rico:type", "main", "rico:title"),
} else { seriesTitle = Extract.typedEntityByType(titles, "rico:type", "series", "rico:title"),
entity["rico:expressedDate"] as String broadcastTitle = Extract.typedEntityByType(titles, "rico:type", "broadcast", "rico:title"),
} type = record["rico:type"] as String,
val qualifier = entity["rico:dateQualifier"] as String? sourceID = try {
val certainty = entity["rico:certainty"] as String? Extract.extractSourceId(identifiers)
val result = mutableListOf<Pair<String, Any>>( } catch (ex: NoSuchElementException) {
Pair(transform.target + ".date", date) log.error("No source id found for record $id")
"NoSourceIdFound"
},
sameAs = Extract.listOfStrings(record["schema:sameAs"]),
abstract = Extract.extractLanguageContainer("dct:abstract", record["dct:abstract"]),
id = id,
institution = listOf(
NameContainer(
LanguageContainer(listOf("Institution"), listOf("institution"), listOf("istituzione"), emptyList()),
listOf("https://memobase.ch/institution/MEMORIAV")
)
),
recordSet = NameContainer(
LanguageContainer(listOf("Bestand"), listOf("collection"), listOf("fondo"), emptyList()),
listOf("https://memobase.ch/recordSet/EXAMPLE")
),
descriptiveNote = Extract.extractLanguageContainer("rico:descriptiveNote", record["rico:descriptiveNote"]),
scopeAndContent = Extract.extractLanguageContainer("rico:scopeAndContent", record["rico:scopeAndContent"]),
relatedMaterial = Extract.extractLanguageContainer("dct:relation", record["dct:relation"]),
source = Extract.extractLanguageContainer("rico:source", record["rico:source"]),
temporal = Extract.extractDate(temporal),
dateCreated = Extract.extractDate(datesCreated),
dateIssued = Extract.extractDate(datesIssued),
placeCapture = Extract.extractPlaces(placeCaptured),
placeRelated = Extract.extractPlaces(placesRelated),
place = Extract.facetEntity(placeCaptured + placesRelated, "rico:name"),
rightsHolder = Extract.typedEntityByType(rules, "rico:type", "holder", "rico:name"),
memoriavClaim = record["rdau:P60451"] != null,
languageCaption = Extract.typedEntityByType(languages, "rico:type", "caption", "rico:name"),
languageContent = Extract.typedEntityByType(languages, "rico:type", "content", "rico:name"),
language = Extract.facetEntity(languages, "rico:name"),
genre = Extract.facetEntity(genre, "skos:prefLabel"),
keywords = Extract.facetEntity(subjects, "skos:prefLabel"),
agentSubject = Extract.typedEntityByType(subjects, "@type", "rico:Agent", "rico:name"),
personSubject = Extract.typedEntityByType(subjects, "@type", "rico:Person", "rico:name"),
corporateBodySubject = Extract.typedEntityByType(subjects, "@type", "rico:CorporateBody", "rico:name"),
agentProducer = Extract.typedEntityByType(producers, "@type", "rico:Agent", "rico:name"),
personProducer = Extract.typedEntityByType(producers, "@type", "rico:Person", "rico:name"),
corporateBodyProducer = Extract.typedEntityByType(producers, "@type", "rico:CorporateBody", "rico:name"),
agentPublisher = Extract.typedEntityByType(publishers, "@type", "rico:Agent", "rico:name"),
personPublisher = Extract.typedEntityByType(publishers, "@type", "rico:Person", "rico:name"),
corporateBodyPublisher = Extract.typedEntityByType(publishers, "@type", "rico:CorporateBody", "rico:name"),
agentContributor = Extract.extractTypedCreationRelationAgent(
creationRelationAgents,
input,
creationRelationTypeParam = "contributor",
agentTypeParam = "rico:Agent"
),
personContributor = Extract.extractTypedCreationRelationAgent(
creationRelationAgents,
input,
creationRelationTypeParam = "contributor",
agentTypeParam = "rico:Person"
),
corporateBodyContributor = Extract.extractTypedCreationRelationAgent(
creationRelationAgents,
input,
creationRelationTypeParam = "contributor",
agentTypeParam = "rico:CorporateBody"
),
agentCreator = Extract.extractTypedCreationRelationAgent(
creationRelationAgents,
input,
creationRelationTypeParam = "creator",
agentTypeParam = "rico:Agent"
),
personCreator = Extract.extractTypedCreationRelationAgent(
creationRelationAgents,
input,
creationRelationTypeParam = "creator",
agentTypeParam = "rico:Person"
),
corporateBodyCreator = Extract.extractTypedCreationRelationAgent(
creationRelationAgents,
input,
creationRelationTypeParam = "creator",
agentTypeParam = "rico:CorporateBody"
),
persons = emptyList(),
corporateBodies = emptyList(),
agents = emptyList()
) )
if (qualifier != null) {
result.add(
Pair(transform.target + ".qualifier", qualifier)
)
}
if (certainty != null) {
result.add(
Pair(transform.target + ".certainty", certainty)
)
}
val facetList = when (entity["@type"] as String) {
"rico:SingleDate" ->
if (isNormalized)
DateFacetBuilder.buildFromNormalizedSingleDate(date)
else emptyList()
"rico:DateRange" ->
if (isNormalized)
DateFacetBuilder.buildFromNormalizedDateRange(date)
else emptyList()
else -> emptyList()
}
if (facetList.isNotEmpty())
result.add(Pair(transform.target + ".facet", facetList))
return result
}
private fun extractCreationRelationAndAgents(
input: Map<String, JsonObject>,
creationRelation: JsonObject
): List<Pair<String, Any>> {
val type = creationRelation["rico:type"] as String
val name = creationRelation["rico:name"] as String
val target = (creationRelation["rico:creationRelationHasTarget"] as JsonObject)["@id"] as String
val agent = input[target]
return if (agent == null) {
emptyList()
} else {
val agentType = agent["@type"] as String
val agentName = agent["rico:name"] as Any
val pair1 =
extractPair(name, "${agentType.split(':')[1]}${type}Raw.relation")
val pair2 =
extractPair(agentName, "${agentType.split(':')[1]}${type}Raw.name")
val result = mutableListOf<Pair<String, Any>>()
if (pair1 != null) {
result.add(pair1)
}
if (pair2 != null) {
result.add(pair2)
}
result
}
} }
} }
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase package org.memobase.helpers
object DateFacetBuilder { object DateFacetBuilder {
...@@ -81,7 +81,9 @@ object DateFacetBuilder { ...@@ -81,7 +81,9 @@ object DateFacetBuilder {
while (fromDecadeAsInt != untilDecadeAsInt) { while (fromDecadeAsInt != untilDecadeAsInt) {
fromDecadeAsInt += 10 fromDecadeAsInt += 10
results.add( results.add(
"$level_2$separator$fromCentury$separator${getDecade(fromDecadeAsInt)}$terminator" "$level_2$separator$fromCentury$separator${getDecade(
fromDecadeAsInt
)}$terminator"
) )
} }
results results
...@@ -95,7 +97,9 @@ object DateFacetBuilder { ...@@ -95,7 +97,9 @@ object DateFacetBuilder {
while (fromCenturyAsInt != untilCenturyAsInt) { while (fromCenturyAsInt != untilCenturyAsInt) {
results.add( results.add(
"$level_1$separator${getCentury(fromCenturyAsInt)}$separator") "$level_1$separator${getCentury(
fromCenturyAsInt
)}$separator")
fromCenturyAsInt += 1 fromCenturyAsInt += 1
} }
results.add( results.add(
...@@ -107,7 +111,11 @@ object DateFacetBuilder { ...@@ -107,7 +111,11 @@ object DateFacetBuilder {
while (fromDecadeAsInt != untilDecadeAsInt) { while (fromDecadeAsInt != untilDecadeAsInt) {
fromDecadeAsInt += 10 fromDecadeAsInt += 10
results.add( results.add(
"$level_2$separator${getCentury(fromDecadeAsInt / 100)}$separator${getDecade(fromDecadeAsInt)}$terminator" "$level_2$separator${getCentury(
fromDecadeAsInt / 100
)}$separator${getDecade(
fromDecadeAsInt
)}$terminator"
) )
} }
results results
......
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.helpers
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.model.CreatorNameContainer
import org.memobase.model.DateContainer
import org.memobase.model.LanguageContainer
import org.memobase.model.NameContainer
object Extract {
private val log = LogManager.getLogger("ExtractionHelper")
private fun extractLanguageTag(entity: JsonObject, language: String): List<String> {
return entity["@value"].let {
if (entity["@language"] == language && it != null)
listOf(it as String)
else
emptyList()
}
}
fun extractLanguageContainer(parent: String, entity: Any?): List<LanguageContainer> {
return when (entity) {
is String -> listOf(LanguageContainer(emptyList(), emptyList(), emptyList(), listOf(entity)))
is JsonObject ->
listOf(
LanguageContainer(
de = extractLanguageTag(entity, "de"),
fr = extractLanguageTag(entity, "fr"),
it = extractLanguageTag(entity, "it"),
un = emptyList()
)
)
is List<*> ->
listOf(entity.mapNotNull { subEntity ->
when (subEntity) {
is String ->
LanguageContainer(emptyList(), emptyList(), emptyList(), listOf(subEntity))
is JsonObject ->
LanguageContainer(
de = extractLanguageTag(subEntity, "de"),
fr = extractLanguageTag(subEntity, "fr"),
it = extractLanguageTag(subEntity, "it"),
un = emptyList()
)
else -> {
log.error("Could not extract language container from subEntity $subEntity in entity $entity.")
null
}
}
}.reduce { acc, languageContainer ->
acc.merge(languageContainer)
})
else -> {
log.error("Could not extract language container from entity: $entity in parent $parent.")
emptyList()
}
}
}
fun extractDate(entities: List<JsonObject>): List<DateContainer> {
return entities.map { entity ->
val isNormalized = entity.containsKey("rico:normalizedDateValue")
val date = if (isNormalized) {
entity["rico:normalizedDateValue"] as String
} else {
entity["rico:expressedDate"] as String
}
val qualifier = entity["rico:dateQualifier"] as String?
val certainty = entity["rico:certainty"] as String?
val facetList = when (entity["@type"] as String) {
"rico:SingleDate" ->
if (isNormalized)
DateFacetBuilder.buildFromNormalizedSingleDate(date)
else emptyList()
"rico:DateRange" ->
if (isNormalized)
DateFacetBuilder.buildFromNormalizedDateRange(date)
else emptyList()
else -> emptyList()
}
DateContainer(
date = date,
qualifier = qualifier,
certainty = certainty,
facet = facetList
)
}
}
fun typedEntityByType(
entities: List<JsonObject>,
field: String,
type: String,
annotationField: String
): List<LanguageContainer> {
return entities.filter { it[field] == type }.flatMap {
extractLanguageContainer(it["@id"] as String, it[annotationField])
}
}
fun facetEntity(entities: List<JsonObject>, property: String): List<NameContainer> {
return entities.mapNotNull {
val lang = extractLanguageContainer(it["@id"] as String, it[property])
if (lang.isEmpty()) {
null
} else {
NameContainer(lang.first(), lang.first().toList())
}
}
}
fun extractPlaces(entities: List<JsonObject>): List<LanguageContainer> {
return entities.flatMap {
extractLanguageContainer(it["@id"] as String, it["rico:name"])
}
}
fun extractSourceId(entities: List<JsonObject>): String {
return entities.first {
it["rico:type"] as String == "original"
}["rico:identifier"] as String? ?: error("Identifier contains no property rico:identifier: $entities.")
}
</