Commit 0fd31e17 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add reporting name as configuration

parent 93242b5c
...@@ -6,6 +6,7 @@ metadata: ...@@ -6,6 +6,7 @@ metadata:
data: data:
APPLICATION_ID: {{ .Values.applicationId }} APPLICATION_ID: {{ .Values.applicationId }}
SERVICE_CONFIG_TOPIC: {{ .Values.configTopicName }} SERVICE_CONFIG_TOPIC: {{ .Values.configTopicName }}
REPORTING_STEP_NAME: {{ .Values.reportingStepName }}
TOPIC_IN: {{ .Values.inputTopicName }} TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }} TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }} TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
...@@ -8,4 +8,6 @@ applicationId: placeholder ...@@ -8,4 +8,6 @@ applicationId: placeholder
inputTopicName: placeholder inputTopicName: placeholder
outputTopicName: placeholder outputTopicName: placeholder
reportingTopicName: placeholder reportingTopicName: placeholder
configTopicName: placeholder configTopicName: placeholder
\ No newline at end of file
reportingStepName: 03-mapper-service
\ No newline at end of file
...@@ -47,6 +47,7 @@ class KafkaTopology( ...@@ -47,6 +47,7 @@ class KafkaTopology(
private val log = LogManager.getLogger("MappingProcessor") private val log = LogManager.getLogger("MappingProcessor")
private val reportTopic = settings.processReportTopic private val reportTopic = settings.processReportTopic
private val reportingStepName = settings.appSettings.getProperty(Service.reportingStepNameProp)
private val klaxon = Klaxon() private val klaxon = Klaxon()
private val configJoiner = ConfigJoiner<String, ByteArray>( private val configJoiner = ConfigJoiner<String, ByteArray>(
...@@ -83,7 +84,7 @@ class KafkaTopology( ...@@ -83,7 +84,7 @@ class KafkaTopology(
handledStream[0] handledStream[0]
.mapValues { readOnlyKey, value -> .mapValues { readOnlyKey, value ->
Report(readOnlyKey, ReportStatus.fatal, "CONFIGURATION ERROR: ${value.third}", Service.step).toJson() Report(readOnlyKey, ReportStatus.fatal, "CONFIGURATION ERROR: ${value.third}", reportingStepName).toJson()
} }
.to(reportTopic) .to(reportTopic)
...@@ -103,7 +104,7 @@ class KafkaTopology( ...@@ -103,7 +104,7 @@ class KafkaTopology(
key, key,
ReportStatus.fatal, ReportStatus.fatal,
"Caught an error, but no report was created.", "Caught an error, but no report was created.",
Service.step reportingStepName
) )
} }
.to(reportTopic) .to(reportTopic)
...@@ -181,7 +182,7 @@ class KafkaTopology( ...@@ -181,7 +182,7 @@ class KafkaTopology(
key, key,
ReportStatus.fatal, ReportStatus.fatal,
"No Record Id Found: " + value.errorMessages.joinToString("\n"), "No Record Id Found: " + value.errorMessages.joinToString("\n"),
Service.step reportingStepName
).toJson() ).toJson()
} }
.to(reportTopic) .to(reportTopic)
...@@ -205,7 +206,7 @@ class KafkaTopology( ...@@ -205,7 +206,7 @@ class KafkaTopology(
key, key,
ReportStatus.fatal, ReportStatus.fatal,
"No Record Type Value: " + value.errorMessages.joinToString("\n"), "No Record Type Value: " + value.errorMessages.joinToString("\n"),
Service.step reportingStepName
).toJson() ).toJson()
} }
.to(reportTopic) .to(reportTopic)
...@@ -221,7 +222,7 @@ class KafkaTopology( ...@@ -221,7 +222,7 @@ class KafkaTopology(
result.first, result.first,
Pair( Pair(
result.second, result.second,
Report(result.first, ReportStatus.success, "Generated RDF document.", Service.step) Report(result.first, ReportStatus.success, "Generated RDF document.", reportingStepName)
) )
) )
} else { } else {
...@@ -233,7 +234,7 @@ class KafkaTopology( ...@@ -233,7 +234,7 @@ class KafkaTopology(
result.first, result.first,
if (builder.isFatal) ReportStatus.fatal else ReportStatus.warning, if (builder.isFatal) ReportStatus.fatal else ReportStatus.warning,
"Write Record Error: " + result.third.joinToString("\n"), "Write Record Error: " + result.third.joinToString("\n"),
Service.step reportingStepName
) )
) )
) )
...@@ -256,20 +257,20 @@ class KafkaTopology( ...@@ -256,20 +257,20 @@ class KafkaTopology(
Triple(parsedSource, mapperConfiguration.get(), null) Triple(parsedSource, mapperConfiguration.get(), null)
} else { } else {
log.error("Parsed source is empty: ${value.first}.") log.error("Parsed source is empty: ${value.first}.")
Triple(null, null, Report(key, ReportStatus.fatal, "Could not parse source document.", Service.step)) Triple(null, null, Report(key, ReportStatus.fatal, "Could not parse source document.", reportingStepName))
} }
} catch (ex: InvalidMappingException) { } catch (ex: InvalidMappingException) {
log.error(ex.localizedMessage) log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.fatal, "InvalidMappingException: ${ex.localizedMessage}", Service.step)) Triple(null, null, Report(key, ReportStatus.fatal, "InvalidMappingException: ${ex.localizedMessage}", reportingStepName))
} catch (ex: KlaxonException) { } catch (ex: KlaxonException) {
log.error(ex.localizedMessage) log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.fatal, "KLAXON EXCEPTION: ${ex.localizedMessage}", Service.step)) Triple(null, null, Report(key, ReportStatus.fatal, "KLAXON EXCEPTION: ${ex.localizedMessage}", reportingStepName))
} catch (ex: NullPointerException) { } catch (ex: NullPointerException) {
log.error(ex.localizedMessage) log.error(ex.localizedMessage)
Triple( Triple(
null, null,
null, null,
Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", Service.step) Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", reportingStepName)
) )
} }
} }
......
...@@ -24,15 +24,16 @@ import ch.memobase.settings.SettingsLoader ...@@ -24,15 +24,16 @@ import ch.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") { class Service(file: String = "app.yml") {
companion object { companion object {
const val step = "mapper-service"
const val configTopicNameProp = "configTopic" const val configTopicNameProp = "configTopic"
const val reportingStepNameProp = "reportingStepName"
} }
private val log = LogManager.getLogger("MapperService") private val log = LogManager.getLogger("MapperService")
val settings = SettingsLoader( val settings = SettingsLoader(
listOf( listOf(
configTopicNameProp configTopicNameProp,
reportingStepNameProp
), ),
file, file,
useStreamsConfig = true useStreamsConfig = true
......
app: app:
configTopic: ${SERVICE_CONFIG_TOPIC:?system} configTopic: ${SERVICE_CONFIG_TOPIC:?system}
reportingStepName: ${REPORTING_STEP_NAME:?system}
kafka: kafka:
streams: streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system} bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
/* /*
* Copyright (C) 2019 Memobase * Mapper Service
* Copyright (C) 2020-2021 Memobase
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by * it under the terms of the GNU Affero General Public License as published by
...@@ -163,7 +164,7 @@ class IntegrationTests { ...@@ -163,7 +164,7 @@ class IntegrationTests {
"MEI_49884", "MEI_49884",
"FATAL", "FATAL",
"No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.", "No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.",
Service.step "test"
) )
)/*, )/*,
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
app: app:
configTopic: mb-di-config-test configTopic: mb-di-config-test
reportingStepName: test
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment