Commit 83a1d20a authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor reporting step name

parent e1519059
......@@ -20,3 +20,5 @@ reportingTopic: mb-di-reporting-prod
recordsTransactionsTopic: mb-di-transactions-records-prod
recordSetsTransactionsTopic: mb-gi-transactions-record-sets-prod
institutionsTransactionsTopic: mb-gi-transactions-institutions-prod
reportingStepName: 08-import-process-bridge-records
......@@ -20,3 +20,5 @@ reportingTopic: mb-di-reporting-stage
recordsTransactionsTopic: mb-di-transactions-records-stage
recordSetsTransactionsTopic: mb-gi-transactions-record-sets-stage
institutionsTransactionsTopic: mb-gi-transactions-institutions-stage
reportingStepName: 08-import-process-bridge-records
......@@ -20,3 +20,5 @@ reportingTopic: mb-di-reporting-prod
recordsTransactionsTopic: mb-di-transactions-records-prod
recordSetsTransactionsTopic: mb-gi-transactions-record-sets-prod
institutionsTransactionsTopic: mb-gi-transactions-institutions-prod
reportingStepName: 08-import-process-bridge-records
......@@ -20,3 +20,5 @@ reportingTopic: mb-di-reporting-prod
recordsTransactionsTopic: mb-di-transactions-records-prod
recordSetsTransactionsTopic: mb-gi-transactions-record-sets-prod
institutionsTransactionsTopic: mb-gi-transactions-institutions-prod
reportingStepName: 02-import-process-bridge-groups
......@@ -21,3 +21,5 @@ reportingTopic: mb-di-reporting-stage
recordsTransactionsTopic: mb-di-transactions-records-stage
recordSetsTransactionsTopic: mb-gi-transactions-record-sets-stage
institutionsTransactionsTopic: mb-gi-transactions-institutions-stage
reportingStepName: 02-import-process-bridge-groups
......@@ -20,3 +20,5 @@ reportingTopic: mb-di-reporting-prod
recordsTransactionsTopic: mb-di-transactions-records-prod
recordSetsTransactionsTopic: mb-gi-transactions-record-sets-prod
institutionsTransactionsTopic: mb-gi-transactions-institutions-prod
reportingStepName: 02-import-process-bridge-groups
\ No newline at end of file
......@@ -12,3 +12,4 @@ data:
TOPIC_TRANSACTIONS_RECORDS: "{{ .Values.recordsTransactionsTopic }}"
TOPIC_TRANSACTIONS_RECORD_SETS: "{{ .Values.recordSetsTransactionsTopic }}"
TOPIC_TRANSACTIONS_INSTITUTIONS: "{{ .Values.institutionsTransactionsTopic }}"
REPORTING_STEP_NAME: "{{ .Values.reportingStepName }}"
......@@ -15,6 +15,8 @@ k8sLimitsMemory: placeholder
applicationId: placeholder
reportingStepName: placeholder
bootstrapServers: placeholder
outputTopic: placeholder
outputTopicPostfix: placeholder
......
......@@ -43,6 +43,7 @@ class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("ImportProcessBridge")
private val step = settings.appSettings.getProperty(Service.reportingStepNameProp)
fun build(): Topology {
val builder = StreamsBuilder()
......@@ -73,7 +74,7 @@ class KafkaTopology(
.to(settings.appSettings.getProperty("topicTransactionsRecords"))
processOutcome[0]
.mapValues { key, value -> writeReport(key, value, Service.recordsStep) }
.mapValues { key, value -> writeReport(key, value, step) }
.to(settings.processReportTopic)
processOutcome[1]
......@@ -86,7 +87,7 @@ class KafkaTopology(
.to(settings.appSettings.getProperty("topicTransactionsRecordSets"))
processOutcome[1]
.mapValues { key, value -> writeReport(key, value, Service.groupsStep) }
.mapValues { key, value -> writeReport(key, value, step) }
.to(settings.processReportTopic)
processOutcome[2]
......@@ -99,7 +100,7 @@ class KafkaTopology(
.to(settings.appSettings.getProperty("topicTransactionsInstitutions"))
processOutcome[2]
.mapValues { key, value -> writeReport(key, value, Service.groupsStep) }
.mapValues { key, value -> writeReport(key, value, step) }
.to(settings.processReportTopic)
processOutcome[3]
......@@ -111,7 +112,7 @@ class KafkaTopology(
ProcessStatus.FATAL,
"Unable to match key type to either records, record sets or institutions."
),
Service.groupsStep
step
)
}
.to(settings.processReportTopic)
......@@ -127,7 +128,7 @@ class KafkaTopology(
ProcessStatus.FATAL,
"Unable to match key type to either records, record sets or institutions."
),
Service.recordsStep
step
)
}
.to(settings.processReportTopic)
......
......@@ -25,8 +25,7 @@ import org.apache.logging.log4j.LogManager
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("ImportProcessBridgeService")
companion object {
const val recordsStep = "import-process-bridge-records"
const val groupsStep = "import-process-bridge-groups"
const val reportingStepNameProp = "reportingStepName"
}
private val settings = SettingsLoader(
......@@ -34,7 +33,8 @@ class Service(file: String = "app.yml") {
"topicOutPostfix",
"topicTransactionsRecords",
"topicTransactionsRecordSets",
"topicTransactionsInstitutions"
"topicTransactionsInstitutions",
reportingStepNameProp
),
fileName = file,
useStreamsConfig = true
......
......@@ -3,6 +3,7 @@ app:
topicTransactionsRecords: ${TOPIC_TRANSACTIONS_RECORDS:?system}
topicTransactionsRecordSets: ${TOPIC_TRANSACTIONS_RECORD_SETS:?system}
topicTransactionsInstitutions: ${TOPIC_TRANSACTIONS_INSTITUTIONS:?system}
reportingStepName: ${REPORTING_STEP_NAME:?system}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment