Commit fd5b42c2 authored by Jonas Waeber's avatar Jonas Waeber

Normalize facet field structures.

Integrate access term labels.
Refactor translation mappers.
Refactor dependency structure to enable mockking.
Add mockked elasticsearch wrapper.
parent b149f591
......@@ -32,6 +32,8 @@ spec:
mountPath: "/configs/institution_types/"
- name: document-type-labels
mountPath: "/configs/document_types/"
- name: access-term-labels
mountPath: "/configs/access_terms/"
volumes:
- name: instituion-type-labels
configMap:
......@@ -39,4 +41,7 @@ spec:
- name: document-type-labels
configMap:
name: "{{ .Values.documentTypeLabels }}"
- name: access-term-labels
configMap:
name: "{{ .Values.accessTermLabels }}"
restartPolicy: Always
......@@ -14,5 +14,6 @@ reportingTopic: postprocessing-reporting
instutionTypeLabels: institution-type-labels
documentTypeLabels: document-type-labels
accessTermLabels: access-term-labels
mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/
\ No newline at end of file
......@@ -17,15 +17,34 @@
*/
package org.memobase
import ch.memobase.settings.SettingsLoader
import kotlin.system.exitProcess
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.KEYS.SettingsProps
class App {
companion object {
private val log = LogManager.getLogger("App")
@JvmStatic fun main(args: Array<String>) {
fun createSettings(file: String): SettingsLoader {
return SettingsLoader(
listOf(
SettingsProps.institutionTypeLabelsPath,
SettingsProps.documentTypeLabelsPath,
SettingsProps.accessTermLabelsPath,
SettingsProps.mediaUrl,
SettingsProps.elasticHost,
SettingsProps.elasticPort,
SettingsProps.elasticIndex
),
file,
useStreamsConfig = true
)
}
@JvmStatic
fun main(args: Array<String>) {
try {
Service().run()
Service(createSettings("app.yml")).run()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
......
......@@ -30,7 +30,6 @@ import org.memobase.builders.PersonFacetBuilder
import org.memobase.builders.PlaceFacetBuilder
import org.memobase.builders.SuggestContainerBuilder
import org.memobase.helpers.AspectRatio
import org.memobase.helpers.DocumentTypeMapper
import org.memobase.helpers.Extract
import org.memobase.helpers.FacetBuildHelpers
import org.memobase.helpers.Filter
......@@ -38,14 +37,17 @@ import org.memobase.helpers.InstitutionAndRecordSetExtractionHelper.extractInsti
import org.memobase.helpers.InstitutionAndRecordSetExtractionHelper.extractRecordSet
import org.memobase.helpers.KEYS
import org.memobase.helpers.ReuseStatementMap
import org.memobase.helpers.TranslationMappers
import org.memobase.model.DocumentsSearchDoc
import org.memobase.model.EnrichedDigitalMetadata
import org.memobase.model.FacetContainer
import org.memobase.model.LanguageContainer
import org.memobase.model.Schema
class DocumentsSearchDocBuilder(
private val documentTypeMapper: DocumentTypeMapper,
private val mediaUrl: String) {
private val translationMappers: TranslationMappers,
private val mediaUrl: String
) {
private val log = LogManager.getLogger("SearchDocTransform")
fun transform(key: String, input: Map<String, JsonObject>): Schema {
......@@ -105,8 +107,6 @@ class DocumentsSearchDocBuilder(
val languages = EnrichedFacetContainerBuilder(emptyList(), NS.rico + KEYS.Language, KEYS.name)
val genres = EnrichedFacetContainerBuilder(genreIds, NS.skos + KEYS.Concept, KEYS.prefLabel)
val id = key
for (item in input.entries) {
for (builder: IFieldBuilder in listOf(
personFacetBuilder,
......@@ -138,7 +138,7 @@ class DocumentsSearchDocBuilder(
languages
)) {
if (builder.filter(item.value)) {
builder.append(id, item.value)
builder.append(key, item.value)
}
if (digitalIdentifierReferences.contains(item.key))
......@@ -157,8 +157,10 @@ class DocumentsSearchDocBuilder(
val physicalRules = Filter.entitiesByProperty("regulatedBy", physicalObject, input)
val physicalIdentifiers = Filter.entitiesByProperty("identifiedBy", physicalObject, input)
val accessPhysical = Extract.typedEntityByType(physicalRules, "type", "access", "name").flatMap { it.toList() }
val accessDigital = Extract.typedEntityByType(digitalRules, "type", "access", "name").flatMap { it.toList() }
val accessPhysical = Extract.typedEntityByType(physicalRules, "type", "access", "name")
.flatMap { it.toList() }.map { translationMappers.getAccessTerm(it) }
val accessDigital = Extract.typedEntityByType(digitalRules, "type", "access", "name")
.flatMap { it.toList() }.map { translationMappers.getAccessTerm(it) }
val usageDigital = Extract.typedEntityByType(digitalRules, "type", "usage", "sameAs").flatMap { it.toList() }
......@@ -196,9 +198,9 @@ class DocumentsSearchDocBuilder(
val type = record[KEYS.ricoType].let {
if (it == null) {
documentTypeMapper.getValue("Andere")
translationMappers.getDocumentType("Andere")
} else {
documentTypeMapper.getValue(it as String)
translationMappers.getDocumentType(it as String)
}
}
......@@ -210,12 +212,12 @@ class DocumentsSearchDocBuilder(
sourceID = try {
Extract.extractIdValue(recordIdentifiers, KEYS.IdentifierType.original) ?: "NoSourceIdFound"
} catch (ex: NoSuchElementException) {
log.error("No source id found for record $id")
log.error("No source id found for record $key")
"NoSourceIdFound"
},
sameAs = Extract.listOfStrings(record["sameAs"]),
abstract = Extract.languageContainer("abstract", record["abstract"]),
recordId = id,
recordId = key,
institution = extractInstitution(record),
recordSet = extractRecordSet(record),
descriptiveNote = Extract.languageContainer("descriptiveNote", record["descriptiveNote"]),
......@@ -239,8 +241,10 @@ class DocumentsSearchDocBuilder(
if (it.isEmpty())
LanguageContainer.EMPTY
else
it.reduce { acc, languageContainer -> acc.merge(languageContainer)
}},
it.reduce { acc, languageContainer ->
acc.merge(languageContainer)
}
},
personSubject = subjectPersonBuilder.build(),
personProducer = producersPersonBuilder.build(),
personPublisher = publisherPersonBuilder.build(),
......@@ -261,7 +265,7 @@ class DocumentsSearchDocBuilder(
agentCreator = creatorAgentBuilder.build(),
// DIGITAL & PHYSICAL
access = accessPhysical + accessDigital,
access = accessDigital + accessPhysical,
// DIGITAL
accessDigital = accessDigital,
......
......@@ -21,26 +21,27 @@ package org.memobase
import ch.memobase.rdf.NS
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import java.util.Properties
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Date
import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.Extract
import org.memobase.helpers.InstitutionTypeMapper
import org.memobase.helpers.KEYS
import org.memobase.helpers.TranslationMappers
import org.memobase.model.FacetContainer
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.LanguageContainer
import org.memobase.model.Schema
class InstitutionSearchDocBuilder(private val institutionTypeMapper: InstitutionTypeMapper, appSettings: Properties) {
class InstitutionSearchDocBuilder(
private val translationMappers: TranslationMappers,
private val elasticSearchWrapper: ElasticSearchWrapper
) {
private val log = LogManager.getLogger("InstitutionSearchDocBuilder")
private val elasticSearchWrapper = ElasticSearchWrapper(appSettings)
fun transform(key: String, input: Map<String, JsonObject>): Schema {
val institution =
input["institution"] ?: throw InvalidInputException("No institution entity found in message $key.")
input["institution"] ?: throw InvalidInputException("No institution entity found in message $key.")
val identifiers = mutableListOf<JsonObject>()
val cantons = mutableListOf<FacetContainer>()
val municipalities = mutableListOf<LanguageContainer>()
......@@ -67,8 +68,8 @@ class InstitutionSearchDocBuilder(private val institutionTypeMapper: Institution
val type = institution[KEYS.wikidataInstance].let {
when (it) {
is String -> listOf(institutionTypeMapper.getValue(it))
is JsonArray<*> -> it.map { any -> institutionTypeMapper.getValue(any as String) }
is String -> listOf(translationMappers.getInstitutionType(it))
is JsonArray<*> -> it.map { any -> translationMappers.getInstitutionType(any as String) }
else -> {
log.error("Found no institution types on institution $key")
emptyList()
......@@ -82,44 +83,44 @@ class InstitutionSearchDocBuilder(private val institutionTypeMapper: Institution
val recordSetIds = Extract.listOfStrings(institution[KEYS.isHolderOf])
return InstitutionSearchDoc(
institutionId = id,
published = institution[KEYS.isPublished].let {
when (it) {
is String -> it.toBoolean()
else -> {
log.error("Found no isPublished property on institution $key. Set to false.")
false
}
institutionId = id,
published = institution[KEYS.isPublished].let {
when (it) {
is String -> it.toBoolean()
else -> {
log.error("Found no isPublished property on institution $key. Set to false.")
false
}
},
type = type,
name = name,
description = description,
documentType = listOf(FacetContainer.placeholder("PLACEHOLDER")),
keyVisualLink = institution[KEYS.wikidataImage].let { if (it != null) it as String else "NoKeyVisualLinkDefined" },
canton = cantons,
city = municipalities,
address = addresses,
postalCodes = postalCodes,
numberOfRecordSets = recordSetIds.count(),
numberOfDocuments = recordSetIds
.map { value -> elasticSearchWrapper.countNumberOfDocuments(value) }
.sum(),
teaserColor = institution[KEYS.teaserColor].let {
if (it == null) {
log.error("No teaser colour found for institution $id.")
""
} else
it as String
},
teaserColorComputed = institution[KEYS.teaserColorComputed].let {
if (it == null) {
log.error("No computed teaser colour found for institution $id.")
""
} else
it as String
},
lastUpdatedDate = Date.now
}
},
type = type,
name = name,
description = description,
documentType = listOf(FacetContainer.placeholder("PLACEHOLDER")),
keyVisualLink = institution[KEYS.wikidataImage].let { if (it != null) it as String else "NoKeyVisualLinkDefined" },
canton = cantons,
city = municipalities,
address = addresses,
postalCodes = postalCodes,
numberOfRecordSets = recordSetIds.count(),
numberOfDocuments = recordSetIds
.map { value -> elasticSearchWrapper.countNumberOfDocuments(value) }
.sum(),
teaserColor = institution[KEYS.teaserColor].let {
if (it == null) {
log.error("No teaser colour found for institution $id.")
""
} else
it as String
},
teaserColorComputed = institution[KEYS.teaserColorComputed].let {
if (it == null) {
log.error("No computed teaser colour found for institution $id.")
""
} else
it as String
},
lastUpdatedDate = Date.now
)
}
......
......@@ -29,34 +29,31 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.DocumentTypeMapper
import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.InstitutionTypeMapper
import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS.SettingsProps
import org.memobase.helpers.TranslationMappers
import org.memobase.model.DocumentsSearchDoc
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
import org.memobase.model.Schema
class KafkaTopology(private val settings: SettingsLoader) {
class KafkaTopology(
private val settings: SettingsLoader,
translationMappers: TranslationMappers,
elasticSearchWrapper: ElasticSearchWrapper
) {
private val log = LogManager.getLogger("SearchDocService")
private val appSettings = settings.appSettings
private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
private val documentMapperPath = appSettings.getProperty(SettingsProps.documentTypeLabelsPath)
private val institutionMapperPath = appSettings.getProperty(SettingsProps.institutionTypeLabelsPath)
private val reportTopic = settings.processReportTopic
private val documentTypeMapper = DocumentTypeMapper(documentMapperPath)
private val institutionTypeMapper = InstitutionTypeMapper(institutionMapperPath)
private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, mediaUrl)
private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
private val documentSearchDocBuilder = DocumentsSearchDocBuilder(documentTypeMapper, mediaUrl)
private val institutionSearchDoc = InstitutionSearchDocBuilder(institutionTypeMapper, appSettings)
private val elasticSearchWrapper = ElasticSearchWrapper(settings.appSettings)
private val recordSetSearchDocBuilder =
RecordSetSearchDocBuilder(elasticSearchWrapper)
RecordSetSearchDocBuilder(elasticSearchWrapper)
private val jsonWriter = ObjectMapper().registerKotlinModule().writer()
......@@ -64,90 +61,99 @@ class KafkaTopology(private val settings: SettingsLoader) {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
val branchedStream = stream
.mapValues { value -> JSON.parse(value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.recordTag) },
Predicate { _, value -> value.containsKey(JSON.institutionTag) },
Predicate { _, value -> value.containsKey(JSON.recordSetTag) },
Predicate { _, _ -> true }
)
.mapValues { value -> JSON.parse(value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.recordTag) },
Predicate { _, value -> value.containsKey(JSON.institutionTag) },
Predicate { _, value -> value.containsKey(JSON.recordSetTag) },
Predicate { _, _ -> true }
)
val recordStream = branchedStream[0]
.mapValues { readOnlyKey, value ->
try {
Pair(
documentSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(DocumentsSearchDoc.DEFAULT, Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name))
}
.mapValues { readOnlyKey, value ->
try {
Pair(
documentSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(
DocumentsSearchDoc.DEFAULT,
Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name)
)
}
}
outputStreams(recordStream)
val institutionStream = branchedStream[1]
.mapValues { readOnlyKey, value ->
try {
Pair(
institutionSearchDoc.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
.mapValues { readOnlyKey, value ->
try {
Pair(
institutionSearchDoc.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(
InstitutionSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name
)
} catch (ex: InvalidInputException) {
Pair(InstitutionSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name))
}
)
}
}
outputStreams(institutionStream)
val recordSetStream = branchedStream[2]
.mapValues { readOnlyKey, value ->
try {
Pair(
recordSetSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
.mapValues { readOnlyKey, value ->
try {
Pair(
recordSetSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(
RecordSetSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name
)
} catch (ex: InvalidInputException) {
Pair(RecordSetSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name))
}
)
}
}
outputStreams(recordSetStream)
branchedStream[3]
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
"No record, memobase institution or record set present in input data: $value.",
Service.name
)
}
.to(reportTopic)
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
"No record, memobase institution or record set present in input data: $value.",
Service.name
)
}
.to(reportTopic)
return builder.build()
}
private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
stream
.mapValues { value -> value.second.toJson() }
.to(reportTopic)
.mapValues { value -> value.second.toJson() }
.to(reportTopic)
stream
.filterNot { _, value -> value.second.status == ReportStatus.fatal }
.mapValues { value -> value.first }
.mapValues { value ->
val out = StringWriter()
jsonWriter.writeValue(out, value)
out.toString()
}
.to(settings.outputTopic)
.filterNot { _, value -> value.second.status == ReportStatus.fatal }
.mapValues { value -> value.first }
.mapValues { value ->
val out = StringWriter()
jsonWriter.writeValue(out, value)
out.toString()
}
.to(settings.outputTopic)
}
}
......@@ -19,31 +19,73 @@
package org.memobase
import ch.memobase.settings.SettingsLoader
import java.net.ConnectException
import java.net.SocketTimeoutException
import kotlin.system.exitProcess
import org.apache.http.HttpHost
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.KEYS
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.indices.GetIndexRequest
import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.KEYS.SettingsProps
import org.memobase.helpers.TranslationMappers
class Service(file: String = "app.yml") {
class Service(settings: SettingsLoader) {
companion object {
const val name = "search-doc-service"
}
private val log = LogManager.getLogger("SearchDocService")
val settings = SettingsLoader(
listOf(
KEYS.SettingsProps.institutionTypeLabelsPath,
KEYS.SettingsProps.documentTypeLabelsPath,
KEYS.SettingsProps.mediaUrl,
KEYS.SettingsProps.elasticHost,
KEYS.SettingsProps.elasticPort,
KEYS.SettingsProps.elasticIndex
),
file,
useStreamsConfig = true
)
val topology = KafkaTopology(settings).build()
private val appSettings = settings.appSettings
private val documentMapperPath = appSettings.getProperty(SettingsProps.documentTypeLabelsPath)
private val accessTermPath = appSettings.getProperty(SettingsProps.accessTermLabelsPath)
private val institutionMapperPath = appSettings.getProperty(SettingsProps.institutionTypeLabelsPath)
private val translationMappers = TranslationMappers(institutionMapperPath, documentMapperPath, accessTermPath)
private val host = appSettings.getProperty(SettingsProps.elasticHost)
private val port = appSettings.getProperty(SettingsProps.elasticPort).toInt()
private val documentsIndex = appSettings.getProperty(SettingsProps.elasticIndex)
private val client: RestHighLevelClient = connect()
private fun connect(): RestHighLevelClient {
return try {
val c = RestHighLevelClient(
RestClient.builder(
HttpHost(host, port)
)
)
val indexExists = c.indices().exists(GetIndexRequest(documentsIndex), RequestOptions.DEFAULT)
val aliasExists = c.indices().existsAlias(GetAliasesRequest(documentsIndex), RequestOptions.DEFAULT)
if (!indexExists && !aliasExists) {
log.error("Could not find the index or alias defined in the configuration: $documentsIndex.")
exitProcess(1)
} else {
log.info("Successfully connected to index $documentsIndex. Ready to query.")
c
}
} catch (ex: ElasticsearchException) {
log.error(ex.detailedMessage)
exitProcess(1)
} catch (ex: SocketTimeoutException) {
log.error(ex.localizedMessage)
exitProcess(1)
} catch (ex: ConnectException) {
log.error(ex.localizedMessage)
exitProcess(1)
}
}
private val elasticSearchWrapper = ElasticSearchWrapper(appSettings, client, translationMappers)