Unverified Commit a21488c2 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

log values

parent 50ba202d
Pipeline #16649 passed with stages
in 6 minutes and 43 seconds
......@@ -80,12 +80,18 @@ class KafkaTopology(
handledStream[0]
.mapValues { readOnlyKey, value ->
Report(readOnlyKey, ReportStatus.failure, value.third).toJson() }
Report(readOnlyKey, ReportStatus.failure, value.third).toJson()
}
.to(reportTopic)
val parsedStream = handledStream[1]
.mapValues { value -> Pair(value.first, value.second) }
.mapValues { readOnlyKey, value -> parse(readOnlyKey, value) }
.mapValues { readOnlyKey, value ->
log.warn("Key: {}", readOnlyKey)
log.warn("First Value: {}", value.first)
log.warn("Second Value as String: {}", String(value.second))
parse(readOnlyKey, value)
}
.branch(
Predicate { _, value -> value.third != null },
Predicate { _, _ -> true }
......@@ -126,7 +132,7 @@ class KafkaTopology(
private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> {
return when {
value.hasException() -> {
Triple("", ByteArray(0), value.exception.localizedMessage)
Triple("", ByteArray(0), value.exception.localizedMessage)
}
value.hasValue() -> {
Triple(value.value.left, value.value.right, "")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment