Unverified Commit 062eb0d6 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor to functional approach


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent a6a2eedd
Pipeline #22349 passed with stages
in 5 minutes and 31 seconds
......@@ -19,12 +19,8 @@
package ch.memobase
import ch.memobase.models.{DeleteMessage, ParserIgnore, Report}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging
import java.time.Duration
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success}
object App
......@@ -39,41 +35,25 @@ object App
parse(args) match {
case Some((sessionId, filters, dryRun)) =>
try {
var continue = true
var processed = 0
val matches: mutable.HashSet[DeleteMessage] = mutable.HashSet()
Stream.continually(poll).takeWhile(_.nonEmpty)
val matches = Stream.continually(poll)
.takeWhile(_.nonEmpty)
.flatten
.zip(Stream.from(1))
.foreach(p => if (p._2 % 250000 == 0) {
logger.info(s"${p._2} records processed. Looking at ${p._1.key()} right now")
})
/* while (continue) {
val records = poll
if (records.isEmpty) {
continue = false
}
for (record <- records) {
processed += 1
Report(record) match {
case Success(rep) =>
if (filters.forall(f => f(rep))) {
matches += DeleteMessage(rep, sessionId)
}
case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message")
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}")
}
if (processed % 250000 == 0) {
logger.info(s"$processed records processed; ${matches.size} matches")
}
}
}
for (m <- matches) {
sendDelete(m, dryRun)
} */
.flatMap(record => Report(record) match {
case Success(rep) if filters.forall(f => f(rep)) =>
logger.debug("Match found")
List(DeleteMessage(rep, sessionId))
case Success(_) =>
List()
case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message")
List()
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}")
List()
}).toSet
logger.info(s"${matches.size} matches found")
matches.foreach(msg => sendDelete(msg, dryRun))
} catch {
case e: Exception =>
logger.error(e)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment