/* * Import Process Delete * Copyright (C) 2021 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase import ch.memobase.models.{DeleteMessage, ParserIgnore, Report} import org.apache.logging.log4j.scala.Logging import scala.collection.immutable.HashSet import scala.util.{Failure, Success} object App extends KProducer with scala.App with ArgParser with AppSettings with KConsumer with MsgFilter with Logging { parse(args) match { case Some((sessionId, filters, dryRun)) => var counter = 0 var matchCounter = 0 try { Stream .continually(poll) .takeWhile(records => records.nonEmpty) .flatMap(records => { logger.info(s"Processing records batch with size ${records.size}; ${counter} processed so far") counter += records.size records .flatMap { record => Report(record) match { case Success(rep) => Some(rep) case Failure(_: ParserIgnore) => logger.debug("Ignoring irrelevant message") None case Failure(ex) => logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}") logger.info(s"${record.value()}") None } } .filter(record => filters.forall(f => f(record))) .map(record => { matchCounter += 1 logger.info(s"${matchCounter} matches so far") record }) .foldLeft(HashSet[DeleteMessage]())((agg, record) => { logger.info(s"Size of delete messages: ${agg.size}") agg + DeleteMessage(record, sessionId) } ) } ) .foreach(id => sendDelete(id, dryRun)) } catch { case e: Exception => logger.error(e) sys.exit(1) } finally { logger.info("Shutting down application") closeConsumer() closeProducer() } case None => logger.error("Invalid command-line arguments!") sys.exit(1) } }