Commit 4f44d963 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update config topic setting

parent 40c8f212
Pipeline #24575 failed with stages
in 1 minute and 43 seconds
......@@ -4,7 +4,7 @@ applicationId: di-xml-data-transform-prod-app
kafkaConfigs: prod-kafka-bootstrap-servers-new
sftpConfigs: internal-sftp-config
inputTopicName: mb-di-data-transform-prod
outputTopicName: mb-di-mapper-prod
reportingTopicName: mb-di-reporting-prod
configTopicName: mb-di-config-prod
\ No newline at end of file
inputTopicName: "mb-di-data-transform-prod"
outputTopicName: "mb-di-mapper-prod"
reportingTopicName: "mb-di-reporting-prod"
configTopicName: "mb-di-config-prod"
\ No newline at end of file
......@@ -7,4 +7,5 @@ data:
APPLICATION_ID: {{ .Values.applicationId }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
SERVICE_CONFIG_TOPIC: {{ .Values.configTopicName }}
\ No newline at end of file
......@@ -21,13 +21,11 @@ package org.memobase
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.errors.KafkaUtilsException
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
import java.io.File
import net.sf.saxon.s9api.SaxonApiException
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
......@@ -36,6 +34,7 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.Service.Companion.configTopic
import org.memobase.models.Content
import org.memobase.models.Formats
import org.memobase.models.Input
......@@ -44,6 +43,7 @@ import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.SerdeMessage
import org.memobase.utils.XsltException
import org.memobase.xml.XMLTransformer
import java.io.File
class KafkaTopology(private val settings: SettingsLoader) {
......@@ -61,7 +61,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
fun build(): Topology {
val builder = StreamsBuilder()
val configStream = builder.stream<String, String>("import-process-config")
val configStream = builder.stream<String, String>(settings.appSettings.getProperty(configTopic))
.map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
......
......@@ -27,10 +27,13 @@ class Service(file: String = "app.yml") {
companion object {
const val name = "xml-data-transform"
const val configTopic = "configTopic"
}
val settings = SettingsLoader(
listOf(),
listOf(
configTopic
),
file,
useStreamsConfig = true,
readSftpSettings = true
......
......@@ -3,6 +3,8 @@ sftp:
port: ${SFTP_PORT:?system}
user: ${SFTP_USER:?system}
password: ${SFTP_PASSWORD:?system}
app:
configTopic: ${SERVICE_CONFIG_TOPIC:?system}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment