Verified Commit b429c48e authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

start dummy Kafka consumer

parent 504f92a6
...@@ -18,6 +18,63 @@ ...@@ -18,6 +18,63 @@
package org.memobase package org.memobase
object App extends scala.App { import java.time.Duration
println("Hello world!")
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging
import org.memobase.models.KafkaMessageLoader
import org.memobase.settings.SettingsLoader
import org.memobase.utils.DBHandler
import scala.collection.JavaConverters._
object App extends scala.App with Logging {
val settings = new SettingsLoader(List(
"audioSinkDir",
"imageSinkDir",
"videoSinkDir",
"mediaserverDBHost",
"mediaserverDBPort",
"mediaserverDBUser",
"mediaserverDBPassword",
"mediaserverDBTable"
).asJava,
"app.yml",
false,
false,
true,
false)
val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings)
val dbHandle = DBHandler(
settings.getAppSettings.getProperty("mediaserverDBHost"),
settings.getAppSettings.getProperty("mediaserverDBPort").asInstanceOf[Int],
settings.getAppSettings.getProperty("mediaserverDBUser"),
settings.getAppSettings.getProperty("mediaserverDBPassword"),
settings.getAppSettings.getProperty("mediaserverDBTable")
)
try {
logger.debug(s"Subscribing to topic ${settings.getInputTopic}")
consumer.subscribe(List(settings.getInputTopic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) {
val msg = KafkaMessageLoader.load(record.value())
msg.event match {
case "new" || "update" =>
case "delete" =>
}
// TODO: Implement real logic
println(record.key())
}
}
} catch {
case e: Exception => logger.error(e)
} finally {
logger.info("Shutting down application")
consumer.close()
dbHandle.close()
}
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment