Commit 73f9783c authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add service class to run service

parent f1eb8c76
......@@ -19,34 +19,14 @@
package org.memobase
import kotlin.system.exitProcess
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class App {
companion object {
private val log = LogManager.getLogger("TableDataTransform")
private val log = LogManager.getLogger("TableDataTransformApp")
@JvmStatic fun main(args: Array<String>) {
try {
val settings = SettingsLoader(
listOf(
"sheet",
"header.count",
"header.line",
"identifier"
),
useStreamsConfig = true,
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
Thread.sleep(10_000L)
}
}
Service().run()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
......
/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
val settings = SettingsLoader(
listOf(
"sheet",
"header.count",
"header.line",
"identifier"
),
file,
useStreamsConfig = true,
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
fun run() {
stream.use {
it.start()
while (stream.state().isRunning) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment