Unverified Commit 5a32dbd0 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

don't build report object of irrelevant messages


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 62957f51
Pipeline #17880 passed with stages
in 5 minutes and 26 seconds
......@@ -18,7 +18,7 @@
package ch.memobase
import ch.memobase.models.{DeleteMessage, Report}
import ch.memobase.models.{DeleteMessage, ParserIgnore, Report}
import org.apache.logging.log4j.scala.Logging
import scala.collection.immutable.HashSet
......@@ -44,6 +44,9 @@ object App
.flatMap { record =>
Report(record) match {
case Success(rep) => Some(rep)
case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message")
None
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"${record.value()}")
......
......@@ -37,9 +37,7 @@ trait MsgFilter {
recordSets: Seq[String],
records: Seq[String],
sessions: Seq[String]): Seq[FilterFun] =
Seq(buildStepFilter("fedora-ingest")) ++
Seq(buildStatusFilter("SUCCESS")) ++
Seq(buildCreatedAfterFilter(standardiseTimestamp(createdAfter))) ++
Seq(buildCreatedAfterFilter(standardiseTimestamp(createdAfter))) ++
Seq(buildCreatedBeforeFilter(standardiseTimestamp(createdBefore))) ++
institutions.map(v => buildInstitutionIdFilter(v)) ++
records.map(v => buildRecordIdFilter(v)) ++
......@@ -73,11 +71,4 @@ trait MsgFilter {
private val buildCreatedBeforeFilter: String => FilterFun =
timestamp => report => dateFormatter.parse(timestamp).before(report.timestamp)
private val buildStatusFilter: String => FilterFun =
status => report => report.status == status
private val buildStepFilter: String => FilterFun =
step => report => report.step == step
}
......@@ -21,3 +21,5 @@ package ch.memobase.models
import java.io.IOException
class ParserException(message: String) extends IOException(message)
class ParserIgnore(message: String) extends Exception(message)
\ No newline at end of file
......@@ -41,13 +41,19 @@ object Report {
def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!"))
val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj"))
val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj"))
if (step != "fedora-ingest") {
throw new ParserIgnore("No fedora-ingest message")
}
val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj"))
if (status != "SUCCESS") {
throw new ParserIgnore("No SUCCESS message")
}
val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj"))
val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj"))
val timestamp = Try(dateFormatter.parse(timestampString))
.orElse(Try(shortDateFormatter.parse(timestampString)))
.getOrElse(throw new ParserException("No valid timestamp"))
val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj"))
val message = Try(json.obj("message").str).getOrElse(throw new ParserException("No `message` field in JSON obj"))
val headers = consumerRecord.headers()
val recordSet = Try(new String(headers.lastHeader("recordSetId").value()))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment