/* * Import Process Delete * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase import java.text.SimpleDateFormat import java.util.Calendar import ch.memobase.models.Report import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.logging.log4j.scala.Logging import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} trait MsgFilter { self: Logging => private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS") type FilterFun = ConsumerRecord[String, String] => Boolean def buildFilters(createdAfter: Calendar, createdBefore: Calendar, institutions: Seq[String], recordSets: Seq[String], records: Seq[String], sessions: Seq[String]): Seq[FilterFun] = Seq(buildStepFilter("fedora-ingest")) ++ Seq(buildStatusFilter("SUCCESS")) ++ Seq(buildCreatedAfterFilter(standardiseTimestamp(createdAfter))) ++ Seq(buildCreatedBeforeFilter(standardiseTimestamp(createdBefore))) ++ institutions.map(v => buildInstitutionIdFilter(v)) ++ records.map(v => buildRecordIdFilter(v)) ++ recordSets.map(v => buildRecordSetIdFilter(v)) ++ sessions.map(v => buildSessionIdFilter(v)) private def standardiseTimestamp(calendar: Calendar): String = f"${calendar.get(Calendar.YEAR)}%04d-" + f"${calendar.get(Calendar.MONTH)}%02d-" + f"${calendar.get(Calendar.DAY_OF_MONTH)}%02dT" + f"${calendar.get(Calendar.HOUR_OF_DAY)}%02d:" + f"${calendar.get(Calendar.MINUTE)}%02d:" + f"${calendar.get(Calendar.SECOND)}%02d." + f"${calendar.get(Calendar.MILLISECOND)}%03d" private val buildSessionIdFilter: String => FilterFun = sessionId => rec => rec.headers() .headers("sessionId") .asScala .map(header => new String(header.value())) .exists(v => v == sessionId) private val buildRecordSetIdFilter: String => FilterFun = recordSetId => rec => rec.headers() .headers("recordSetId") .asScala .map(header => new String(header.value())) .exists(v => v == recordSetId) private val buildInstitutionIdFilter: String => FilterFun = institutionId => rec => rec.headers() .headers("institutionId") .asScala .map(header => new String(header.value())) .exists(v => v == institutionId) private val buildRecordIdFilter: String => FilterFun = recordId => rec => Report(rec.value()).id == recordId private val buildCreatedAfterFilter: String => FilterFun = timestamp => rec => Try { dateFormatter.parse(timestamp).after(dateFormatter.parse(Report(rec.value()).timestamp)) } match { case Success(res) => res case Failure(_) => logger.warn(s"Parsing of timestamp ${Report(rec.value()).timestamp} failed! Ignoring record") false } private val buildCreatedBeforeFilter: String => FilterFun = timestamp => rec => Try { dateFormatter.parse(timestamp).before(dateFormatter.parse(Report(rec.value()).timestamp)) } match { case Success(res) => res case Failure(_) => logger.warn(s"Parsing of timestamp ${Report(rec.value()).timestamp} failed! Ignoring record") false } private val buildStatusFilter: String => FilterFun = status => rec => Report(rec.value()).status == status private val buildStepFilter: String => FilterFun = step => rec => Report(rec.value()).step == step }