Commit 26c17016 authored by Jonas Waeber's avatar Jonas Waeber

integrate service admin

parent 7e21ebb6
Pipeline #13486 passed with stages
in 6 minutes and 4 seconds
......@@ -39,7 +39,8 @@ dependencies {
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.11.0'
implementation 'org.memobase:memobase-service-utilities:1.11.1'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
......
......@@ -37,7 +37,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val xmlTransformer = XMLTransformer(settings.appSettings)
private val reportingTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val errorFilter = builder
......@@ -87,7 +87,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
streamOutput(transformedValue)
return builder.build()
return builder
}
private fun streamOutput(stream: KStream<String, SAXContentHandler>) {
......
......@@ -18,9 +18,15 @@
package org.memobase
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
......@@ -36,10 +42,16 @@ class Service(file: String = "app.yml") {
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val registry = EffectsRegistry()
private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
fun run() {
registry.register(ShutdownMessage(appId.replace("-normalization-service", ""), "normalization-service", "termination"), shutdownEffect)
registry.run(builder, "import-process-admin")
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
......@@ -48,4 +60,8 @@ class Service(file: String = "app.yml") {
}
}
}
private fun exit(): BoxedUnit {
exitProcess(0)
}
}
......@@ -57,7 +57,7 @@ class TestIntegration {
val service = Service("test${params.count}.yml")
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val testDriver = TopologyTestDriver(service.builder.build(), service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment