Commit 917b2f28 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement shutdown hook.

parent 24dc9c26
Pipeline #13539 passed with stages
in 5 minutes and 28 seconds
......@@ -32,10 +32,8 @@ ext {
}
dependencies {
// https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
//compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.1.0'
implementation 'org.memobase:memobase-service-utilities:1.11.1'
implementation 'org.memobase:memobase-service-utilities:1.12.1'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
......
......@@ -38,7 +38,7 @@ class KafkaTopology(
private val config = MappingConfig(settings.appSettings.getProperty("configs"))
private val reportTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
......@@ -80,7 +80,7 @@ class KafkaTopology(
val recordStream = completedMappingStream.mapValues { value -> value.writeRecord() }
objectOutput(recordStream)
return builder.build()
return builder
}
private fun objectOutput(stream: KStream<String, Pair<KeyValue<String, String>, Report>>) {
......
......@@ -18,9 +18,15 @@
package org.memobase
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("MapperService")
......@@ -35,10 +41,16 @@ class Service(file: String = "app.yml") {
useStreamsConfig = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val registry = EffectsRegistry()
private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
fun run() {
registry.register(ShutdownMessage(appId.replace("-normalization-service", ""), "normalization-service", "termination"), shutdownEffect)
registry.run(builder, "import-process-admin")
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
......@@ -47,4 +59,8 @@ class Service(file: String = "app.yml") {
}
}
}
private fun exit(): BoxedUnit {
exitProcess(0)
}
}
\ No newline at end of file
......@@ -57,7 +57,7 @@ class IntegrationTests {
fileName = "kafkaTest${params.count}.yml",
useStreamsConfig = true
)
val testDriver = TopologyTestDriver(KafkaTopology(settings).build(), settings.kafkaStreamsSettings)
val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment