Unverified Commit 1417c7bb authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

make reporting topic prefix customisable


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent cf972340
......@@ -29,8 +29,8 @@ spec:
value: import-process-administrator
- name: APPLICATION_ID
value: import-process-administrator
- name: REPORTING_TOPIC_IN
value: import-process-reporting-*
- name: REPORTING_TOPIC_IN_PREFIX
value: import-process-reporting-
- name: COUNT_TOPIC_IN
value: import-dataset-sizes
- name: TOPIC_OUT
......
......@@ -5,6 +5,6 @@ kafka:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
topic:
in: ${REPORTING_TOPIC_IN:?system}
in: ${REPORTING_TOPIC_IN_PREFIX:?system}
out: ${TOPIC_OUT:?system}
process: ${TOPIC_PROCESS:?system}
\ No newline at end of file
......@@ -40,7 +40,7 @@ class KafkaTopology extends Logging {
val intermediaryReportingTopic = "import-process-ordered-service-reports"
val builder = new StreamsBuilder
val topicPattern = Pattern.compile(SettingsFromFile.getReportingInputTopic)
val topicPattern = Pattern.compile(s"${SettingsFromFile.getReportingInputTopicPrefix}.*")
val reportSource = builder.stream[String, String](topicPattern)
val orderedReportSource = builder.stream[String, Int](intermediaryReportingTopic)
......
......@@ -30,7 +30,7 @@ class ReportTransformer extends Transformer[String, String, KeyValue[String, Int
this.context = context
override def transform(key: String, value: String): KeyValue[String, Int] =
new KeyValue[String, Int](context.topic().replace("import-process-reporting-", ""), 1)
new KeyValue[String, Int](context.topic().replace(SettingsFromFile.getReportingInputTopicPrefix, ""), 1)
override def close(): Unit = {}
}
......@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
trait Settings {
def getKafkaStreamsSettings: Properties
def getReportingInputTopic: String
def getReportingInputTopicPrefix: String
def getCountInputTopic: String
......@@ -55,7 +55,7 @@ object SettingsFromFile extends Settings {
def getKafkaStreamsSettings: Properties = settings.getKafkaStreamsSettings
def getReportingInputTopic: String = settings.getInputTopic
def getReportingInputTopicPrefix: String = settings.getInputTopic
override def getCountInputTopic: String = settings.getAppSettings.getProperty("countTopicIn")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment