Unverified Commit 11b25bad authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

add count and exit

parent 5032939a
Pipeline #22321 passed with stages
in 5 minutes
...@@ -45,12 +45,20 @@ object App ...@@ -45,12 +45,20 @@ object App
parse(args) match { parse(args) match {
case Some((sessionId, filters, dryRun)) => case Some((sessionId, filters, dryRun)) =>
try { try {
while (true) { var continue = true
var processed = 0
var matches = 0
while (continue) {
val records = consumer.poll(Duration.ofMillis(pollTimeout)).asScala val records = consumer.poll(Duration.ofMillis(pollTimeout)).asScala
if (records.isEmpty) {
continue = false
}
for (record <- records) { for (record <- records) {
processed += 1
Report(record) match { Report(record) match {
case Success(rep) => case Success(rep) =>
if (filters.forall(f => f(rep))) { if (filters.forall(f => f(rep))) {
matches += 1
sendDelete(DeleteMessage(rep, sessionId), dryRun) sendDelete(DeleteMessage(rep, sessionId), dryRun)
} }
case Failure(_: ParserIgnore) => case Failure(_: ParserIgnore) =>
...@@ -59,6 +67,9 @@ object App ...@@ -59,6 +67,9 @@ object App
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}") logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}") logger.info(s"${record.value()}")
} }
if (processed % 100000 == 0) {
logger.info(s"$processed records processed; $matches matches")
}
} }
} }
} catch { } catch {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment