Service.kt 2.12 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Table Data Import Service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
23
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
Jonas Waeber's avatar
Jonas Waeber committed
24
25
26
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
27
28
29
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
Jonas Waeber's avatar
Jonas Waeber committed
30
31
32
33
34

class Service(file: String = "app.yml") {
    private val log = LogManager.getLogger("TableDataService")

    val settings = SettingsLoader(
35
        listOf(),
Jonas Waeber's avatar
Jonas Waeber committed
36
37
38
39
40
        file,
        useStreamsConfig = true,
        readSftpSettings = true
    )

Jonas Waeber's avatar
Jonas Waeber committed
41
42
43
44
45
    private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
    val builder = KafkaTopology(settings).prepare()

    private val registry = EffectsRegistry()
    private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
Jonas Waeber's avatar
Jonas Waeber committed
46
47

    fun run() {
Jonas Waeber's avatar
Jonas Waeber committed
48
49
50
        registry.register(ShutdownMessage(appId.replace("-normalization-service", ""), "normalization-service", "termination"), shutdownEffect)
        registry.run(builder, "import-process-admin")
        val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
Jonas Waeber's avatar
Jonas Waeber committed
51
52
53
54
55
56
57
58
        stream.use {
            it.start()
            while (stream.state().isRunning) {
                log.info("Service is running.")
                Thread.sleep(10_000L)
            }
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
59
60
61
62

    private fun exit(): BoxedUnit {
        exitProcess(0)
    }
Jonas Waeber's avatar
Jonas Waeber committed
63
}