Commit 9eab31af authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update search doc service name.

parent c4681aa5
...@@ -30,9 +30,9 @@ import org.apache.kafka.streams.Topology ...@@ -30,9 +30,9 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Constants.SettingsProps
import org.memobase.helpers.ElasticSearchWrapper import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.JsonUtility import org.memobase.helpers.JsonUtility
import org.memobase.helpers.Constants.SettingsProps
import org.memobase.helpers.TranslationMappers import org.memobase.helpers.TranslationMappers
import org.memobase.helpers.UpdateQueryBuilder import org.memobase.helpers.UpdateQueryBuilder
import org.memobase.model.DocumentsSearchDoc import org.memobase.model.DocumentsSearchDoc
...@@ -68,6 +68,7 @@ class KafkaTopology( ...@@ -68,6 +68,7 @@ class KafkaTopology(
val stream = builder.stream<String, String>(settings.inputTopic) val stream = builder.stream<String, String>(settings.inputTopic)
val branchedStream = stream val branchedStream = stream
.mapValues { value -> JsonUtility.parse(value) } .mapValues { value -> JsonUtility.parse(value) }
// TODO: Add reporting for this.
.filter { _, value -> value.isNotEmpty() } .filter { _, value -> value.isNotEmpty() }
.mapValues { value -> JsonUtility.unpack(value) } .mapValues { value -> JsonUtility.unpack(value) }
.map { key, value -> KeyValue(key.substringAfterLast("/"), value) } .map { key, value -> KeyValue(key.substringAfterLast("/"), value) }
...@@ -83,12 +84,17 @@ class KafkaTopology( ...@@ -83,12 +84,17 @@ class KafkaTopology(
try { try {
Pair( Pair(
documentSearchDocBuilder.transform(readOnlyKey, value), documentSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name) Report(
readOnlyKey,
ReportStatus.success,
"Transformed message into search doc.",
Service.nameRecords
)
) )
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
Pair( Pair(
DocumentsSearchDoc.DEFAULT, DocumentsSearchDoc.DEFAULT,
Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name) Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.nameRecords)
) )
} }
} }
...@@ -100,7 +106,12 @@ class KafkaTopology( ...@@ -100,7 +106,12 @@ class KafkaTopology(
try { try {
Pair( Pair(
institutionSearchDoc.transform(readOnlyKey, value), institutionSearchDoc.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name) Report(
readOnlyKey,
ReportStatus.success,
"Transformed message into search doc.",
Service.nameInstitutions
)
) )
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
Pair( Pair(
...@@ -120,7 +131,12 @@ class KafkaTopology( ...@@ -120,7 +131,12 @@ class KafkaTopology(
try { try {
Pair( Pair(
recordSetSearchDocBuilder.transform(readOnlyKey, value), recordSetSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name) Report(
readOnlyKey,
ReportStatus.success,
"Transformed message into search doc.",
Service.nameRecordSets
)
) )
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
Pair( Pair(
...@@ -141,7 +157,7 @@ class KafkaTopology( ...@@ -141,7 +157,7 @@ class KafkaTopology(
readOnlyKey, readOnlyKey,
ReportStatus.fatal, ReportStatus.fatal,
"No record, memobase institution or record set present in input data: $value.", "No record, memobase institution or record set present in input data: $value.",
Service.name Service.nameRecords
) )
} }
.to(reportTopic) .to(reportTopic)
......
...@@ -37,7 +37,7 @@ import org.memobase.helpers.TranslationMappers ...@@ -37,7 +37,7 @@ import org.memobase.helpers.TranslationMappers
class Service(settings: SettingsLoader) { class Service(settings: SettingsLoader) {
companion object { companion object {
const val name = "di-record-es-transform" const val nameRecords = "di-record-es-transform"
const val nameRecordSets = "gi-record-set-es-transform" const val nameRecordSets = "gi-record-set-es-transform"
const val nameInstitutions = "gi-institution-es-transform" const val nameInstitutions = "gi-institution-es-transform"
} }
......
...@@ -129,7 +129,7 @@ class TestInstitutionSearchDoc { ...@@ -129,7 +129,7 @@ class TestInstitutionSearchDoc {
"completeExampleTest", "completeExampleTest",
ReportStatus.success, ReportStatus.success,
"Transformed message into search doc.", "Transformed message into search doc.",
Service.name Service.nameRecords
) )
) )
} }
......
...@@ -158,7 +158,7 @@ class TestRecordSetSearchDoc { ...@@ -158,7 +158,7 @@ class TestRecordSetSearchDoc {
"testComplete", "testComplete",
ReportStatus.success, ReportStatus.success,
"", "",
Service.name Service.nameRecords
) )
) )
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment