Unverified Commit 5032939a authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files
parent 5dd94ddb
Pipeline #22317 passed with stages
in 5 minutes and 21 seconds
......@@ -19,8 +19,11 @@
package ch.memobase
import ch.memobase.models.{DeleteMessage, ParserIgnore, Report}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging
import java.time.Duration
import scala.collection.JavaConverters._
import scala.util.{Failure, Success}
object App
......@@ -32,20 +35,30 @@ object App
with MsgFilter
with Logging {
lazy private val consumer = {
val consumer = new KafkaConsumer[String, String](consumerProps)
logger.debug(s"Subscribing to topic $inputTopic")
consumer.subscribe(List(inputTopic).asJava)
consumer
}
parse(args) match {
case Some((sessionId, filters, dryRun)) =>
try {
for (record <- poll) {
Report(record) match {
case Success(rep) =>
if (filters.forall(f => f(rep))) {
sendDelete(DeleteMessage(rep, sessionId), dryRun)
}
case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message")
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}")
while (true) {
val records = consumer.poll(Duration.ofMillis(pollTimeout)).asScala
for (record <- records) {
Report(record) match {
case Success(rep) =>
if (filters.forall(f => f(rep))) {
sendDelete(DeleteMessage(rep, sessionId), dryRun)
}
case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message")
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}")
}
}
}
} catch {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment