Commit 4df0d3c1 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement error handling for kafka utils.

parent 6f2ce3b9
......@@ -32,7 +32,7 @@ ext {
}
dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'ch.memobase:memobase-kafka-utils:0.2.0'
implementation 'org.memobase:memobase-service-utilities:0.16.0'
implementation 'ch.memobase:mapper-service-configuration:0.3.4'
// Logging Framework
......
......@@ -22,6 +22,7 @@ import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.kafka.utils.models.ValueWithException
import ch.memobase.mapping.MappingConfigurationParser
import ch.memobase.mapping.exceptions.InvalidMappingException
import com.beust.klaxon.Klaxon
......@@ -70,7 +71,20 @@ class KafkaTopology(
val joinedStream =
configJoiner.join(stream, configStream)
val parsedStream = joinedStream
val handledStream = joinedStream
.mapValues { readOnlyKey, value -> handleExceptions(value) }
.branch(
Predicate { _, value -> value.third != "" },
Predicate { _, _ -> true }
)
handledStream[0]
.mapValues { readOnlyKey, value ->
Report(readOnlyKey, ReportStatus.failure, value.third).toJson() }
.to(reportTopic)
val parsedStream = handledStream[1]
.mapValues { value -> Pair(value.first, value.second) }
.mapValues { readOnlyKey, value -> parse(readOnlyKey, value) }
.branch(
Predicate { _, value -> value.third != null },
......@@ -110,6 +124,20 @@ class KafkaTopology(
return builder
}
private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> {
return when {
value.hasException() -> {
Triple("", ByteArray(0), value.exception.localizedMessage)
}
value.hasValue() -> {
Triple(value.value.left, value.value.right, "")
}
else -> {
Triple("", ByteArray(0), "Could not handle error in kafka utils library.")
}
}
}
private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
stream
.mapValues { _, value -> value.first }
......@@ -185,16 +213,16 @@ class KafkaTopology(
private fun parse(
key: String,
value: JoinedValues<String, ByteArray>
value: Pair<String, ByteArray>
): Triple<Map<String, Any>?, MapperConfiguration?, Report?> {
return try {
val mapperConfiguration = MappingConfigurationParser(value.right)
val parsedSource = klaxon.parse<Map<String, Any>>(value.left)
val mapperConfiguration = MappingConfigurationParser(value.second)
val parsedSource = klaxon.parse<Map<String, Any>>(value.first)
if (parsedSource != null) {
log.info("Successfully parsed source & mapping configuration.")
Triple(parsedSource, mapperConfiguration.get(), null)
} else {
log.error("Parsed source is empty: ${value.left}.")
log.error("Parsed source is empty: ${value.first}.")
Triple(null, null, Report(key, ReportStatus.failure, "Found empty source document."))
}
} catch (ex: InvalidMappingException) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment