Commit 52705fa5 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add query update for record set name & institution name!

parent e73132ae
Pipeline #22592 passed with stages
in 5 minutes and 39 seconds
......@@ -4,3 +4,4 @@ inputTopic: fedora-output-json-institutions
reportingTopic: postprocessing-reporting
mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/
updateTopic: institution-updates
\ No newline at end of file
......@@ -4,3 +4,4 @@ inputTopic: fedora-output-json-record-sets
reportingTopic: postprocessing-reporting
mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/
updateTopic: record-set-updates
\ No newline at end of file
......@@ -9,6 +9,7 @@ data:
INSTITUTION_INDEX: "{{ .Values.institutionIndex }}"
RECORD_SET_INDEX: "{{ .Values.recordSetIndex }}"
MEDIA_SERVER_URL: "{{ .Values.mediaServerUrl }}"
UPDATE_TOPIC: "{{ .Values.updateTopic }}"
TOPIC_IN: "{{ .Values.inputTopic }}"
TOPIC_OUT: "{{ .Values.outputTopic }}"
TOPIC_PROCESS: "{{ .Values.reportingTopic }}"
\ No newline at end of file
......@@ -20,3 +20,4 @@ accessTermLabels: access-term-labels
reuseStatementLabels: reuse-statement-labels
mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/
updateTopic: placeholder
\ No newline at end of file
......@@ -37,7 +37,8 @@ class App {
SettingsProps.elasticPort,
SettingsProps.documentsIndex,
SettingsProps.institutionIndex,
SettingsProps.recordSetIndex
SettingsProps.recordSetIndex,
SettingsProps.updateTopic
),
file,
useStreamsConfig = true
......
......@@ -34,6 +34,7 @@ import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS.SettingsProps
import org.memobase.helpers.TranslationMappers
import org.memobase.helpers.UpdateQueryBuilder
import org.memobase.model.DocumentsSearchDoc
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
......@@ -48,11 +49,14 @@ class KafkaTopology(
private val appSettings = settings.appSettings
private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
private val updateTopic = appSettings.getProperty(SettingsProps.updateTopic)
private val reportTopic = settings.processReportTopic
private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, mediaUrl)
private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
private val updateQueryBuilder = UpdateQueryBuilder()
private val recordSetSearchDocBuilder =
RecordSetSearchDocBuilder(elasticSearchWrapper)
......@@ -148,14 +152,35 @@ class KafkaTopology(
.mapValues { value -> value.second.toJson() }
.to(reportTopic)
stream
val schema = stream
.filterNot { _, value -> value.second.status == ReportStatus.fatal }
.mapValues { value -> value.first }
schema
.mapValues { value ->
val out = StringWriter()
jsonWriter.writeValue(out, value)
out.toString()
}
.to(settings.outputTopic)
// generate update messages for institution & record set names!
schema
.map { key, value ->
when (value) {
is InstitutionSearchDoc -> KeyValue(
"$key#update",
updateQueryBuilder.updateInstitutionName(value.id, value.name)
)
is RecordSetSearchDoc -> KeyValue(
"$key#update",
updateQueryBuilder.updateRecordSetName(value.id, value.name)
)
else -> KeyValue(key, null)
}
}
.filter { _, value -> value != null }
.mapValues { value -> JSON.queryToJson(value!!) }
.to(updateTopic)
}
}
......@@ -71,4 +71,7 @@ object JSON {
}
fun queryToJson(query: UpdateQuery): String {
return klaxon.toJsonString(query)
}
}
\ No newline at end of file
......@@ -24,6 +24,7 @@ object KEYS {
}
object SettingsProps {
const val updateTopic = "updateTopic"
const val accessTermLabelsPath = "accessTermLabelsPath"
const val reuseStatementLabelsPath = "reuseStatementLabelsPath"
const val documentTypeLabelsPath = "documentTypeLabelsPath"
......
package org.memobase.helpers
data class UpdateQuery(
val term: String,
val value: String,
val source: String,
val params: Map<String, String>
)
package org.memobase.helpers
import org.memobase.model.LanguageContainer
class UpdateQueryBuilder {
private val institutionFacetTerm = "institution.facet"
private val recordSetFacetTerm = "recordSet.facet"
fun updateInstitutionName(institution: String, names: LanguageContainer): UpdateQuery {
return UpdateQuery(
institutionFacetTerm,
institution,
"ctx._source['institution'][0]['name']['de'] = params.de; ctx._source['institution'][0]['name']['fr'] = params.fr; ctx._source['institution'][0]['name']['it'] = params.it",
mapOf(
Pair("de", names.de[0]),
Pair("fr", names.fr[0]),
Pair("it", names.it[0])
)
)
}
fun updateRecordSetName(recordSet: String, names: LanguageContainer): UpdateQuery {
return UpdateQuery(
recordSetFacetTerm,
recordSet,
"ctx._source['recordSet'][0]['name']['de'] = params.de; ctx._source['recordSet'][0]['name']['fr'] = params.fr; ctx._source['recordSet'][0]['name']['it'] = params.it",
mapOf(
Pair("de", names.de[0]),
Pair("fr", names.fr[0]),
Pair("it", names.it[0])
)
)
}
}
\ No newline at end of file
......@@ -11,6 +11,7 @@ app:
documentTypeLabelsPath: "/configs/document_types/labels.csv"
accessTermLabelsPath: "/configs/access_terms/labels.csv"
reuseStatementLabelsPath: "/configs/reuse_statements/labels.csv"
updateTopic: ${UPDATE_TOPIC:?system}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -11,6 +11,7 @@ app:
institutionTypeLabelsPath: "src/test/resources/configs/institution-type-labels.csv"
accessTermLabelsPath: "src/test/resources/configs/access-term-labels.csv"
reuseStatementLabelsPath: "src/test/resources/configs/reuse-statement-labels.csv"
updateTopic: "documents-v18"
kafka:
streams:
bootstrap.servers: localhost:12345
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment