Unverified Commit a6a2eedd authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

just a test

parent 11b25bad
Pipeline #22344 passed with stages
in 6 minutes
......@@ -24,6 +24,7 @@ import org.apache.logging.log4j.scala.Logging
import java.time.Duration
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success}
object App
......@@ -35,21 +36,20 @@ object App
with MsgFilter
with Logging {
lazy private val consumer = {
val consumer = new KafkaConsumer[String, String](consumerProps)
logger.debug(s"Subscribing to topic $inputTopic")
consumer.subscribe(List(inputTopic).asJava)
consumer
}
parse(args) match {
case Some((sessionId, filters, dryRun)) =>
try {
var continue = true
var processed = 0
var matches = 0
while (continue) {
val records = consumer.poll(Duration.ofMillis(pollTimeout)).asScala
val matches: mutable.HashSet[DeleteMessage] = mutable.HashSet()
Stream.continually(poll).takeWhile(_.nonEmpty)
.flatten
.zip(Stream.from(1))
.foreach(p => if (p._2 % 250000 == 0) {
logger.info(s"${p._2} records processed. Looking at ${p._1.key()} right now")
})
/* while (continue) {
val records = poll
if (records.isEmpty) {
continue = false
}
......@@ -58,8 +58,7 @@ object App
Report(record) match {
case Success(rep) =>
if (filters.forall(f => f(rep))) {
matches += 1
sendDelete(DeleteMessage(rep, sessionId), dryRun)
matches += DeleteMessage(rep, sessionId)
}
case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message")
......@@ -67,11 +66,14 @@ object App
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}")
}
if (processed % 100000 == 0) {
logger.info(s"$processed records processed; $matches matches")
if (processed % 250000 == 0) {
logger.info(s"$processed records processed; ${matches.size} matches")
}
}
}
for (m <- matches) {
sendDelete(m, dryRun)
} */
} catch {
case e: Exception =>
logger.error(e)
......
......@@ -44,7 +44,7 @@ trait AppSettings {
props.put("group.id", clientId)
props.put("allow.auto.create.topics", "false")
props.put("auto.offset.reset", "earliest")
props.put("enable.auto.commit", "false")
props.put("enable.auto.commit", "true")
props.put("max.poll.records", "5000")
props
}
......
......@@ -54,13 +54,13 @@ trait ArgParser {
.text("session id filter")
.optional(),
opt[Calendar]('a', "created-after")
.action((v, c) => c.copy(createdAfterFilter = v))
.action((v, c) => c.copy(createdAfterFilter = Some(v)))
.valueName("<datetime>")
.text("retains only records processed after timestamp")
.maxOccurs(1)
.optional(),
opt[Calendar]('b', "created-before")
.action((v, c) => c.copy(createdBeforeFilter = v))
.action((v, c) => c.copy(createdBeforeFilter = Some(v)))
.valueName("<datetime>")
.text("retains only records processed before timestamp")
.maxOccurs(1)
......@@ -76,7 +76,7 @@ trait ArgParser {
)
}
def parse(args: Array[String]): Option[(String, Seq[FilterFun], Boolean)] = {
def parse(args: Array[String]): Option[(String, Set[FilterFun], Boolean)] = {
OParser.parse(parser, args, Args()) match {
case Some(config) => Some(
config.sessionId,
......
......@@ -31,46 +31,87 @@ trait MsgFilter {
type FilterFun = Report => Boolean
def buildFilters(createdAfter: Calendar,
createdBefore: Calendar,
def buildFilters(createdAfter: Option[Calendar],
createdBefore: Option[Calendar],
institutions: Seq[String],
recordSets: Seq[String],
records: Seq[String],
sessions: Seq[String]): Seq[FilterFun] = {
logger.info("Build filters")
Seq(buildCreatedAfterFilter(standardiseTimestamp(createdAfter))) ++
Seq(buildCreatedBeforeFilter(standardiseTimestamp(createdBefore))) ++
institutions.map(v => buildInstitutionIdFilter(v)) ++
records.map(v => buildRecordIdFilter(v)) ++
recordSets.map(v => buildRecordSetIdFilter(v)) ++
sessions.map(v => buildSessionIdFilter(v))
sessions: Seq[String]): Set[FilterFun] = {
Set(buildCreatedAfterFilter(createdAfter)) ++
Set(buildCreatedBeforeFilter(createdBefore)) ++
Set(buildSessionIdFilters(sessions)) ++
Set(buildRecordIdFilters(records)) ++
Set(buildInstitutionIdFilters(institutions)) ++
Set(buildRecordSetIdFilters(recordSets))
}
private def standardiseTimestamp(calendar: Calendar): String =
f"${calendar.get(Calendar.YEAR)}%04d-" +
f"${calendar.get(Calendar.MONTH)}%02d-" +
f"${calendar.get(Calendar.MONTH) + 1}%02d-" +
f"${calendar.get(Calendar.DAY_OF_MONTH)}%02dT" +
f"${calendar.get(Calendar.HOUR_OF_DAY)}%02d:" +
f"${calendar.get(Calendar.MINUTE)}%02d:" +
f"${calendar.get(Calendar.SECOND)}%02d." +
f"${calendar.get(Calendar.MILLISECOND)}%03d"
private val buildSessionIdFilter: String => FilterFun =
sessionId => report => report.sessionId == sessionId
private def buildSessionIdFilters(sessionIds: Seq[String]): FilterFun = {
logger.info(if (sessionIds.isEmpty) {
"no sessionIdFilters built"
} else {
s"sessionIdFilters built for ${sessionIds.mkString(", ")}"
})
report => sessionIds.isEmpty || sessionIds.contains(report.sessionId)
}
private def buildRecordSetIdFilters(recordSetIds: Seq[String]): FilterFun = {
logger.info(if (recordSetIds.isEmpty) {
"no recordSetIdFilters built"
} else {
s"recordSetIdFilters built for ${recordSetIds.mkString(", ")}"
})
report => recordSetIds.isEmpty || recordSetIds.contains(report.recordSetId)
}
private val buildRecordSetIdFilter: String => FilterFun =
recordSetId => report => report.recordSetId == recordSetId
private def buildInstitutionIdFilters(institutionIds: Seq[String]): FilterFun = {
logger.info(if (institutionIds.isEmpty) {
"no institutionIdFilters built"
} else {
s"institutionIdFilters built for ${institutionIds.mkString(", ")}"
})
report => institutionIds.isEmpty || institutionIds.contains(report.institutionId)
}
private val buildInstitutionIdFilter: String => FilterFun =
institutionId => report => report.institutionId == institutionId
private def buildRecordIdFilters(recordIds: Seq[String]): FilterFun = {
logger.info(if (recordIds.isEmpty) {
"no recordIdFilters built"
} else {
s"recordIdFilters built for ${recordIds.mkString(", ")}"
})
report => recordIds.isEmpty || recordIds.contains(report.recordId)
}
private val buildRecordIdFilter: String => FilterFun =
recordId => report => report.recordId == recordId
private def buildCreatedAfterFilter(timestamp: Option[Calendar]): FilterFun = {
val standardisedTimestamp = timestamp.collect {
case t => standardiseTimestamp(t)
}
logger.info(if (timestamp.isEmpty) {
"no createdAfterFilter built"
} else {
s"createdAfterFilter built for $timestamp"
})
report => standardisedTimestamp.isEmpty || dateFormatter.parse(standardisedTimestamp.get).before(report.timestamp)
}
private val buildCreatedAfterFilter: String => FilterFun =
timestamp => report => dateFormatter.parse(timestamp).before(report.timestamp)
private def buildCreatedBeforeFilter(timestamp: Option[Calendar]): FilterFun = {
val standardisedTimestamp = timestamp.collect {
case t => standardiseTimestamp(t)
}
logger.info(if (timestamp.isEmpty) {
"no createdBeforeFilter built"
} else {
s"createdBeforeFilter built for $timestamp"
})
report => standardisedTimestamp.isEmpty || dateFormatter.parse(standardisedTimestamp.get).after(report.timestamp)
}
private val buildCreatedBeforeFilter: String => FilterFun =
timestamp => report => dateFormatter.parse(timestamp).after(report.timestamp)
}
......@@ -25,10 +25,6 @@ case class Args(sessionId: String = "",
recordSetFilters: Seq[String] = Seq(),
institutionFilters: Seq[String] = Seq(),
sessionFilters: Seq[String] = Seq(),
createdAfterFilter: Calendar = {
val cal = Calendar.getInstance()
cal.setTimeInMillis(0)
cal
},
createdBeforeFilter: Calendar = Calendar.getInstance(),
createdAfterFilter: Option[Calendar] = None,
createdBeforeFilter: Option[Calendar] = None,
dryRun: Boolean = false)
......@@ -28,7 +28,7 @@ class ArgParserTest extends AnyFunSuite {
test("should parse correctly") {
val argParser = new {} with MsgFilter with ArgParser with Logging {}
val parsedArgs = argParser.parse(Array("--record-set-filter", "LS-film", "asession"))
val date = new SimpleDateFormat("YYYY-MM-dd").parse("2020-01-02");
val date = new SimpleDateFormat("YYYY-MM-dd").parse("2021-01-02");
val report = Report("aKey", "recId", "fedora-ingest", date, "SUCCESS", "blabla", "LS-film", "aInstitution", "aSession")
assert(parsedArgs.get._2.forall(_ (report)))
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment