Commit da293c9a authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Merge branch 'master' of...

Merge branch 'master' of gitlab.switch.ch:memoriav/memobase-2020/services/import-process/mapper-service
parents 402c9094 1955f83d
Pipeline #17394 failed with stages
in 3 minutes and 45 seconds
...@@ -32,8 +32,8 @@ ext { ...@@ -32,8 +32,8 @@ ext {
} }
dependencies { dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.2.2' implementation 'ch.memobase:memobase-kafka-utils:0.2.3'
implementation 'org.memobase:memobase-service-utilities:0.16.0' implementation 'org.memobase:memobase-service-utilities:1.12.2'
implementation 'ch.memobase:mapper-service-configuration:0.3.4' implementation 'ch.memobase:mapper-service-configuration:0.3.4'
// Logging Framework // Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}" implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
......
...@@ -80,12 +80,15 @@ class KafkaTopology( ...@@ -80,12 +80,15 @@ class KafkaTopology(
handledStream[0] handledStream[0]
.mapValues { readOnlyKey, value -> .mapValues { readOnlyKey, value ->
Report(readOnlyKey, ReportStatus.failure, value.third).toJson() } Report(readOnlyKey, ReportStatus.failure, value.third).toJson()
}
.to(reportTopic) .to(reportTopic)
val parsedStream = handledStream[1] val parsedStream = handledStream[1]
.mapValues { value -> Pair(value.first, value.second) } .mapValues { value -> Pair(value.first, value.second) }
.mapValues { readOnlyKey, value -> parse(readOnlyKey, value) } .mapValues { readOnlyKey, value ->
parse(readOnlyKey, value)
}
.branch( .branch(
Predicate { _, value -> value.third != null }, Predicate { _, value -> value.third != null },
Predicate { _, _ -> true } Predicate { _, _ -> true }
...@@ -126,7 +129,7 @@ class KafkaTopology( ...@@ -126,7 +129,7 @@ class KafkaTopology(
private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> { private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> {
return when { return when {
value.hasException() -> { value.hasException() -> {
Triple("", ByteArray(0), value.exception.localizedMessage) Triple("", ByteArray(0), value.exception.localizedMessage)
} }
value.hasValue() -> { value.hasValue() -> {
Triple(value.value.left, value.value.right, "") Triple(value.value.left, value.value.right, "")
...@@ -230,6 +233,13 @@ class KafkaTopology( ...@@ -230,6 +233,13 @@ class KafkaTopology(
} catch (ex: KlaxonException) { } catch (ex: KlaxonException) {
log.error(ex.localizedMessage) log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage)) Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
} catch (ex: NullPointerException) {
log.error(ex.localizedMessage)
Triple(
null,
null,
Report(key, ReportStatus.failure, "There's no data to be processed: ${ex.localizedMessage}")
)
} }
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment