Commit 76845c21 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Combine update requests into one request for the entire document.

parent 8bbb8927
Pipeline #26631 passed with stages
in 4 minutes and 7 seconds
......@@ -18,6 +18,16 @@
package ch.memobase
import ch.memobase.helpers.Constants.SettingsProps
import ch.memobase.helpers.ElasticSearchWrapper
import ch.memobase.helpers.JsonUtility
import ch.memobase.helpers.TranslationMappers
import ch.memobase.helpers.UpdateQueryBuilder
import ch.memobase.model.DocumentsSearchDoc
import ch.memobase.model.FacetContainer
import ch.memobase.model.InstitutionSearchDoc
import ch.memobase.model.RecordSetSearchDoc
import ch.memobase.model.Schema
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
......@@ -30,17 +40,6 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import ch.memobase.helpers.Constants.SettingsProps
import ch.memobase.helpers.ElasticSearchWrapper
import ch.memobase.helpers.JsonUtility
import ch.memobase.helpers.TranslationMappers
import ch.memobase.helpers.UpdateQueryBuilder
import ch.memobase.model.DocumentsSearchDoc
import ch.memobase.model.FacetContainer
import ch.memobase.model.InstitutionSearchDoc
import ch.memobase.model.RecordSetSearchDoc
import ch.memobase.model.Schema
import ch.memobase.model.UpdateQuery
class KafkaTopology(
private val settings: SettingsLoader,
......@@ -56,9 +55,6 @@ class KafkaTopology(
private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, elasticSearchWrapper, mediaUrl)
private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
private val updateQueryBuilder = UpdateQueryBuilder()
private val recordSetSearchDocBuilder =
RecordSetSearchDocBuilder(elasticSearchWrapper)
......@@ -182,91 +178,57 @@ class KafkaTopology(
}
.to(settings.outputTopic)
// generate update messages for institution & record set names!
// generate update messages for institution & record set names! Only need to check this for record sets.
schema
.map { key, value ->
when (value) {
is RecordSetSearchDoc -> {
// Do not update if the name is already the same.
val currentName = elasticSearchWrapper.getRecordSetName(value.id)
if (value.name == currentName) {
log.info("No update for record set name for ${value.name} as they are already current.")
KeyValue(key, null)
} else {
log.info("Replace $currentName with ${value.name} for record set ${value.id}.")
KeyValue(
"$key#update",
updateQueryBuilder.updateRecordSetName(value.id, value.name)
)
}
}
else -> KeyValue(key, null)
.filter { _, value -> value is RecordSetSearchDoc }
.mapValues { value -> value as RecordSetSearchDoc }
.mapValues { value ->
val updateQueryBuilder = UpdateQueryBuilder(value.id)
val currentName = elasticSearchWrapper.getRecordSetName(value.id)
if (value.name != currentName) {
log.info("Replace $currentName with ${value.name} for record set ${value.id}.")
updateQueryBuilder.updateRecordSetName(value.name)
} else {
log.info("No update for record set name for ${value.name} as they are already current.")
}
Pair(updateQueryBuilder, value)
}
.mapValues { value ->
updateInstitutionContainer(value.second, value.first)
value
}
.filter { _, value -> value != null }
.mapValues { value -> JsonUtility.queryToJson(value!!) }
.map { key, value -> KeyValue("$key#update", value) }
.mapValues { value -> JsonUtility.queryToJson(value.first.build()) }
.to(updateTopic)
recordSetUpdate(schema, "masterInstitution")
recordSetUpdate(schema, "originalInstitution")
recordSetUpdate(schema, "accessInstitution")
recordSetUpdate(schema, "institution")
}
private fun recordSetUpdate(stream: KStream<String, Schema>, targetField: String) {
stream.map { key, value ->
when (value) {
is RecordSetSearchDoc -> {
val update =
updateInstitutionContainer(value, targetField)
if (update == null) {
KeyValue(key, null)
} else {
KeyValue(
"$key#update",
update
)
}
}
else -> KeyValue(key, null)
}
}
.filter { _, value -> value != null }
.mapValues { value -> JsonUtility.queryToJson(value!!) }
.to(updateTopic)
}
private fun updateInstitutionContainer(recordSet: RecordSetSearchDoc, targetField: String): UpdateQuery? {
return when (targetField) {
"institution" ->
buildUpdateQuery(recordSet.id, recordSet.institution, targetField)
"masterInstitution" ->
buildUpdateQuery(recordSet.id, recordSet.masterInstitution, targetField)
"originalInstitution" ->
buildUpdateQuery(recordSet.id, recordSet.originalInstitution, targetField)
"accessInstitution" ->
buildUpdateQuery(recordSet.id, recordSet.accessInstitution, targetField)
else -> throw Exception("Unknown institution type. Set the wrong constant somewhere...")
}
private fun updateInstitutionContainer(
recordSet: RecordSetSearchDoc,
updateQueryBuilder: UpdateQueryBuilder
) {
buildUpdateQuery(recordSet.id, recordSet.institution, "institution", updateQueryBuilder)
buildUpdateQuery(recordSet.id, recordSet.masterInstitution, "masterInstitution", updateQueryBuilder)
buildUpdateQuery(recordSet.id, recordSet.originalInstitution, "originalInstitution", updateQueryBuilder)
buildUpdateQuery(recordSet.id, recordSet.accessInstitution, "accessInstitution", updateQueryBuilder)
}
private fun buildUpdateQuery(
recordSetId: String,
newInstitutions: List<FacetContainer>,
targetField: String
): UpdateQuery? {
targetField: String,
updateQueryBuilder: UpdateQueryBuilder
) {
val currentInstitutions =
elasticSearchWrapper.getExtraInstitutionsFromRecordSet(recordSetId, targetField)
return if (newInstitutions.containsAll(currentInstitutions) &&
currentInstitutions.containsAll(newInstitutions)
) {
log.info("No update for $targetField for $recordSetId as they are already current.")
null
} else {
log.info("Updating field $targetField for $recordSetId with $newInstitutions.")
updateQueryBuilder.updateInstitutionContainers(
recordSetId,
targetField,
newInstitutions
)
......
......@@ -21,30 +21,31 @@ import ch.memobase.model.FacetContainer
import ch.memobase.model.LanguageContainer
import ch.memobase.model.UpdateQuery
class UpdateQueryBuilder {
class UpdateQueryBuilder(private val value: String) {
private val recordSetFacetTerm = "recordSet.facet"
private val sourceLines = mutableListOf<String>()
private val params = mutableMapOf<String, Any>()
fun updateRecordSetName(recordSet: String, names: LanguageContainer): UpdateQuery {
return UpdateQuery(
recordSetFacetTerm,
recordSet,
"ctx._source['recordSet']['name']['de'] = params.de; ctx._source['recordSet']['name']['fr'] = params.fr; ctx._source['recordSet']['name']['it'] = params.it",
mapOf(
Pair("de", names.de[0]),
Pair("fr", names.fr[0]),
Pair("it", names.it[0])
)
)
fun updateRecordSetName(names: LanguageContainer) {
sourceLines.add("ctx._source['recordSet']['name']['de'] = params.de")
sourceLines.add("ctx._source['recordSet']['name']['fr'] = params.fr")
sourceLines.add("ctx._source['recordSet']['name']['it'] = params.it")
params["de"] = names.de[0]
params["fr"] = names.fr[0]
params["it"] = names.it[0]
}
fun updateInstitutionContainers(targetField: String, institutions: List<FacetContainer>) {
sourceLines.add("ctx._source['${targetField}'] = params.${targetField}")
params[targetField] = institutions
}
fun updateInstitutionContainers(recordSet: String, targetField: String, institutions: List<FacetContainer>): UpdateQuery {
fun build(): UpdateQuery {
return UpdateQuery(
recordSetFacetTerm,
recordSet,
"ctx._source['${targetField}'] = params.containers",
mapOf(
Pair("containers", institutions)
)
value,
sourceLines.joinToString("; "),
params
)
}
}
\ No newline at end of file
......@@ -211,17 +211,18 @@ class TestRecordSetSearchDoc {
val value = keyValue.value.replace(TestUtilities.dateRegex, "2020")
val resultValue = readFile("testAddingInstitutionUpdateOutput.json").replace(TestUtilities.dateRegex, "2020")
val updates = updateTopicOutput.readKeyValuesToList()
val updates = updateTopicOutput.readKeyValuesToList()[0]
assertAll("",
{
assertThat(value)
.isEqualTo(resultValue)
},
{ assertThat(updates).hasSize(5) },
{
assertThat(updates.value).isEqualTo(readFile("updateOutput.json"))
},
{
assertThat(updates.key).isEqualTo("testComplete#update")
},
{ assertThat(key).isEqualTo("testComplete") },
{ assertThat(reportKey).isEqualTo("testComplete") },
{
......
{"params" : {"de": "New Name (DE)", "fr": "New Name (FR)", "it": "New Name (IT)", "institution": [{"facet" : [], "filter" : "holdInstitution1", "name" : {"de" : ["Hold Institution 1"], "fr" : ["Hold Institution 1"], "it" : ["Hold Institution 1"], "un" : ["Hold Institution 1"]}}, {"facet" : [], "filter" : "holdInstitution2", "name" : {"de" : ["Hold Institution 2"], "fr" : ["Hold Institution 2"], "it" : ["Hold Institution 2"], "un" : ["Hold Institution 2"]}}], "masterInstitution": [{"facet" : [], "filter" : "completeInstitution", "name" : {"de" : ["Complete Institution"], "fr" : ["Complete Institution"], "it" : ["Complete Institution"], "un" : ["Complete Institution"]}}], "originalInstitution": [{"facet" : [], "filter" : "completeInstitution", "name" : {"de" : ["Complete Institution"], "fr" : ["Complete Institution"], "it" : ["Complete Institution"], "un" : ["Complete Institution"]}}], "accessInstitution": [{"facet" : [], "filter" : "completeInstitution", "name" : {"de" : ["Complete Institution"], "fr" : ["Complete Institution"], "it" : ["Complete Institution"], "un" : ["Complete Institution"]}}]}, "source" : "ctx._source['recordSet']['name']['de'] = params.de; ctx._source['recordSet']['name']['fr'] = params.fr; ctx._source['recordSet']['name']['it'] = params.it; ctx._source['institution'] = params.institution; ctx._source['masterInstitution'] = params.masterInstitution; ctx._source['originalInstitution'] = params.originalInstitution; ctx._source['accessInstitution'] = params.accessInstitution", "term" : "recordSet.facet", "value" : "testComplete"}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment