Unverified Commit b1980295 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

add counter

parent 1f2e8fb5
Pipeline #22355 passed with stages
in 5 minutes and 18 seconds
...@@ -36,20 +36,27 @@ object App ...@@ -36,20 +36,27 @@ object App
case Some((sessionId, filters, dryRun)) => case Some((sessionId, filters, dryRun)) =>
try { try {
val matches = Iterator.continually(poll) val matches = Iterator.continually(poll)
.takeWhile(_.nonEmpty)
.flatten .flatten
.flatMap(record => Report(record) match { .zip(Iterator.from(1))
case Success(rep) if filters.forall(f => f(rep)) => .flatMap(record => {
logger.debug("Match found") if (record._2 % 250000 == 0) {
List(DeleteMessage(rep, sessionId)) logger.info(s"${record._2} records processed so far")
case Success(_) => }
List() Report(record._1) match {
case Failure(_: ParserIgnore) => case Success(rep) if filters.forall(f => f(rep)) =>
logger.debug("Ignoring irrelevant message") logger.debug("Match found")
List() List(DeleteMessage(rep, sessionId))
case Failure(ex) => case Success(_) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}") List()
logger.info(s"${record.value()}") case Failure(_: ParserIgnore) =>
List() logger.debug("Ignoring irrelevant message")
List()
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record._1.value()}")
List()
}
}).toSet }).toSet
logger.info(s"${matches.size} matches found") logger.info(s"${matches.size} matches found")
matches.foreach(msg => sendDelete(msg, dryRun)) matches.foreach(msg => sendDelete(msg, dryRun))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment