Commit b5ef1f0d authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update dependency -> Improves exception handling.

Add filter for fatal messages.
parent 25232251
Pipeline #21832 passed with stages
in 4 minutes and 18 seconds
......@@ -33,7 +33,7 @@ ext {
dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.2.3'
implementation 'org.memobase:memobase-service-utilities:2.0.5'
implementation 'ch.memobase:mapper-service-configuration:1.0.7'
implementation 'ch.memobase:mapper-service-configuration:1.0.8'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
......
......@@ -126,7 +126,9 @@ class KafkaTopology(
val recordStream = completedMappingStream
.map { _, value -> writeRecord(value) }
objectOutput(recordStream)
return builder
}
......@@ -146,6 +148,7 @@ class KafkaTopology(
private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
stream
.filter { _, value -> value.second.status != ReportStatus.fatal }
.mapValues { _, value -> value.first }
.to(settings.outputTopic)
......@@ -226,7 +229,12 @@ class KafkaTopology(
result.first,
Pair(
result.second,
Report(result.first, ReportStatus.warning, result.third.joinToString("\n"), Service.step)
Report(
result.first,
if (builder.isFatal) ReportStatus.fatal else ReportStatus.warning,
result.third.joinToString("\n"),
Service.step
)
)
)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment