Commit 0b3b5ae6 authored by Jonas Waeber's avatar Jonas Waeber

Add reporting for invalid local transforms.

parent aa7c87d8
Pipeline #16107 passed with stages
in 5 minutes and 2 seconds
......@@ -37,7 +37,7 @@ dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:0.16.0'
implementation 'ch.memobase:normalization-service-configuration:0.1.0'
implementation 'ch.memobase:normalization-service-configuration:0.1.3'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
......
......@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
......@@ -41,6 +42,7 @@ import java.io.StringWriter
class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("NormalizationService")
private val reportTopic = settings.processReportTopic
private val globalTransforms = GlobalTransformsLoader(settings.appSettings.getProperty("transformMapping")).get()
......@@ -49,7 +51,8 @@ class KafkaTopology(
Serdes.String(),
Serdes.serdeFrom(
{ _, data -> data.getByteStream() },
{ _, data -> LocalTransformsLoader(data) }),
{ _, data -> LocalTransformsLoader(data) }
),
this::parseConfig
)
......@@ -65,8 +68,18 @@ class KafkaTopology(
configJoiner.join(stream, configStream)
val parsedModelStream = joinedStream
val filterInvalidLocalTransforms = joinedStream
.transformValues(HeaderExtractionTransformSupplier<JoinedValues<String, LocalTransformsLoader>>())
.branch(
Predicate { _, value -> value.first.right.errorMessage != "" },
Predicate { _, _ -> true }
)
filterInvalidLocalTransforms[0]
.mapValues { readOnlyKey, value -> Report(readOnlyKey, ReportStatus.failure, value.first.right.errorMessage).toJson() }
.to(reportTopic)
val parsedModelStream = filterInvalidLocalTransforms[1]
.mapValues { value -> createModel(value) }
.branch(
Predicate { _, value -> value != null },
......@@ -74,10 +87,16 @@ class KafkaTopology(
)
parsedModelStream[1]
.mapValues { key, _ -> Report(key, ReportStatus.failure, "Could not parse message. Found invalid input data (RiotException).").toJson() }
.mapValues { key, _ ->
Report(
key,
ReportStatus.failure,
"Could not parse message. Found invalid input data (RiotException)."
).toJson()
}
.to(reportTopic)
val finishedStream = parsedModelStream[0]
val finishedStream = parsedModelStream[0]
.mapValues { key, value -> transformations(key, value!!) }
.mapValues { value -> Pair(value.first, writeModel(value.second)) }
......@@ -114,7 +133,8 @@ class KafkaTopology(
Report(
key,
if (result.isEmpty()) ReportStatus.success else ReportStatus.failure,
if (result.isEmpty()) "Normalization was successful." else result.reduce { acc, s -> acc + "\n" + s }.trim()
if (result.isEmpty()) "Normalization was successful." else result.reduce { acc, s -> acc + "\n" + s }
.trim()
),
input.model
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment