/* * Import Process Delete * Copyright (C) 2021 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase import ch.memobase.models.{DeleteMessage, ParserIgnore, Report} import org.apache.logging.log4j.scala.Logging import scala.util.{Failure, Success} object App extends KProducer with scala.App with ArgParser with AppSettings with KConsumer with MsgFilter with Logging { parse(args) match { case Some((sessionId, filters, dryRun)) => try { val matches = Iterator.continually(poll) .takeWhile(_.nonEmpty) .flatten .zip(Iterator.from(1)) .flatMap(record => { if (record._2 % 250000 == 0) { logger.info(s"${record._2} records processed so far") } Report(record._1) match { case Success(rep) if filters.forall(f => f(rep)) => logger.info("Match found") List(DeleteMessage(rep, sessionId)) case Success(r) => List() case Failure(_: ParserIgnore) => logger.debug("Ignoring irrelevant message") List() case Failure(ex) => logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}") logger.info(s"${record._1.value()}") List() } }).toSet logger.info(s"${matches.size} matches found") matches.foreach(msg => sendDelete(msg, dryRun)) } catch { case e: Exception => logger.error(e) sys.exit(1) } finally { logger.info("Shutting down application") closeConsumer() closeProducer() } case None => logger.error("Invalid command-line arguments!") sys.exit(1) } }