Commit 25232251 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add error handling for failed processing. Currently it sends warning.

parent 51bc8753
Pipeline #21817 passed with stages
in 4 minutes and 11 seconds
...@@ -33,7 +33,7 @@ ext { ...@@ -33,7 +33,7 @@ ext {
dependencies { dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.2.3' implementation 'ch.memobase:memobase-kafka-utils:0.2.3'
implementation 'org.memobase:memobase-service-utilities:2.0.5' implementation 'org.memobase:memobase-service-utilities:2.0.5'
implementation 'ch.memobase:mapper-service-configuration:1.0.5' implementation 'ch.memobase:mapper-service-configuration:1.0.7'
// Logging Framework // Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}" implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}" implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
......
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
package org.memobase package org.memobase
import ch.memobase.builder.ResourceBuilder import ch.memobase.builder.ResourceBuilder
import ch.memobase.exceptions.InvalidMappingException
import ch.memobase.kafka.utils.ConfigJoiner import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.kafka.utils.models.ValueWithException import ch.memobase.kafka.utils.models.ValueWithException
import ch.memobase.mapping.MapperConfiguration import ch.memobase.mapping.MapperConfiguration
import ch.memobase.mapping.MappingConfigurationParser import ch.memobase.mapping.MappingConfigurationParser
import ch.memobase.exceptions.InvalidMappingException
import ch.memobase.reporting.Report import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier import ch.memobase.settings.HeaderExtractionTransformSupplier
...@@ -173,7 +173,14 @@ class KafkaTopology( ...@@ -173,7 +173,14 @@ class KafkaTopology(
val noRecordId = hasRecordId[1] val noRecordId = hasRecordId[1]
noRecordId noRecordId
.mapValues { key, value -> Report(key, ReportStatus.fatal, value.errorMessage, Service.step).toJson() } .mapValues { key, value ->
Report(
key,
ReportStatus.fatal,
value.errorMessages.joinToString("\n"),
Service.step
).toJson()
}
.to(reportTopic) .to(reportTopic)
return hasRecordId[0] return hasRecordId[0]
...@@ -194,7 +201,7 @@ class KafkaTopology( ...@@ -194,7 +201,7 @@ class KafkaTopology(
Report( Report(
key, key,
ReportStatus.fatal, ReportStatus.fatal,
value.errorMessage, value.errorMessages.joinToString("\n"),
Service.step Service.step
).toJson() ).toJson()
} }
...@@ -205,13 +212,24 @@ class KafkaTopology( ...@@ -205,13 +212,24 @@ class KafkaTopology(
private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> { private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
val result = builder.writeRecord(RDFFormat.NTRIPLES_UTF8) val result = builder.writeRecord(RDFFormat.NTRIPLES_UTF8)
return KeyValue(
result.first, if (result.third.isEmpty()) {
Pair( return KeyValue(
result.second, result.first,
Report(result.first, ReportStatus.success, "", Service.step) Pair(
result.second,
Report(result.first, ReportStatus.success, "", Service.step)
)
) )
) } else {
return KeyValue(
result.first,
Pair(
result.second,
Report(result.first, ReportStatus.warning, result.third.joinToString("\n"), Service.step)
)
)
}
} }
private fun parseConfig(data: ByteArray): ByteArray { private fun parseConfig(data: ByteArray): ByteArray {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment