Unverified Commit 98e136b9 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

check report validity beforehand

parent 8f98a84e
Pipeline #17860 passed with stages
in 4 minutes and 47 seconds
......@@ -18,10 +18,11 @@
package ch.memobase
import ch.memobase.models.DeleteMessage
import ch.memobase.models.{DeleteMessage, Report}
import org.apache.logging.log4j.scala.Logging
import scala.collection.immutable.HashSet
import scala.util.{Failure, Success}
object App
extends KProducer
......@@ -40,6 +41,15 @@ object App
.takeWhile(records => records.nonEmpty)
.flatMap(records =>
records
.flatMap { record =>
Report(record) match {
case Success(rep) => Some(rep)
case Failure(ex) =>
logger.warn(s"Ignoring message because parsing failed: ${ex.getMessage}")
logger.info(s"$record")
None
}
}
.filter(record => filters.forall(f => f(record)))
.foldLeft(HashSet[DeleteMessage]())((agg, record) => agg + DeleteMessage(record, sessionId))
)
......
......@@ -22,18 +22,14 @@ import java.text.SimpleDateFormat
import java.util.Calendar
import ch.memobase.models.Report
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.scala.Logging
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
trait MsgFilter {
self: Logging =>
private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS")
type FilterFun = ConsumerRecord[String, String] => Boolean
type FilterFun = Report => Boolean
def buildFilters(createdAfter: Calendar,
createdBefore: Calendar,
......@@ -61,60 +57,27 @@ trait MsgFilter {
f"${calendar.get(Calendar.MILLISECOND)}%03d"
private val buildSessionIdFilter: String => FilterFun =
sessionId => rec =>
rec.headers()
.headers("sessionId")
.asScala
.map(header => new String(header.value()))
.exists(v => v == sessionId)
sessionId => report => report.sessionId == sessionId
private val buildRecordSetIdFilter: String => FilterFun =
recordSetId => rec =>
rec.headers()
.headers("recordSetId")
.asScala
.map(header => new String(header.value()))
.exists(v => v == recordSetId)
recordSetId => report => report.recordSetId == recordSetId
private val buildInstitutionIdFilter: String => FilterFun =
institutionId => rec =>
rec.headers()
.headers("institutionId")
.asScala
.map(header => new String(header.value()))
.exists(v => v == institutionId)
institutionId => report => report.institutionId == institutionId
private val buildRecordIdFilter: String => FilterFun =
recordId => rec =>
Report(rec.value()).id == recordId
recordId => report => report.recordId == recordId
private val buildCreatedAfterFilter: String => FilterFun =
timestamp => rec =>
Try {
dateFormatter.parse(timestamp).after(dateFormatter.parse(Report(rec.value()).timestamp))
} match {
case Success(res) => res
case Failure(_) =>
logger.warn(s"Parsing of timestamp ${Report(rec.value()).timestamp} failed! Ignoring record")
false
}
timestamp => report => dateFormatter.parse(timestamp).after(report.timestamp)
private val buildCreatedBeforeFilter: String => FilterFun =
timestamp => rec =>
Try {
dateFormatter.parse(timestamp).before(dateFormatter.parse(Report(rec.value()).timestamp))
} match {
case Success(res) => res
case Failure(_) =>
logger.warn(s"Parsing of timestamp ${Report(rec.value()).timestamp} failed! Ignoring record")
false
}
timestamp => report => dateFormatter.parse(timestamp).before(report.timestamp)
private val buildStatusFilter: String => FilterFun =
status => rec => Report(rec.value()).status == status
status => report => report.status == status
private val buildStepFilter: String => FilterFun =
step => rec => Report(rec.value()).step == step
step => report => report.step == step
}
......@@ -18,16 +18,9 @@
package ch.memobase.models
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.collection.JavaConverters._
case class DeleteMessage(recordId: String, recordSetId: String, institutionId: String, sessionId: String)
object DeleteMessage {
def apply(consumerRecord: ConsumerRecord[String, String], sessionId: String): DeleteMessage = {
val recordSetId = new String(consumerRecord.headers().headers("recordSetId").asScala.head.value())
val institutionId = new String(consumerRecord.headers().headers("institutionId").asScala.head.value())
DeleteMessage(consumerRecord.key(), recordSetId, institutionId, sessionId)
}
def apply(report: Report, sessionId: String): DeleteMessage =
DeleteMessage(report.msgKey, report.recordSetId, report.institutionId, report.sessionId)
}
......@@ -18,18 +18,38 @@
package ch.memobase.models
import upickle.default.{Reader, _}
import java.text.SimpleDateFormat
import java.util.Date
case class Report(id: String,
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.util.Try
case class Report(msgKey: String,
recordId: String,
step: String,
timestamp: String,
timestamp: Date,
status: String,
message: String)
message: String,
recordSetId: String,
institutionId: String,
sessionId: String)
object Report {
def apply(json: String): Report = {
read[Report](json)
private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS")
def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
val json = ujson.read(consumerRecord.value())
val id = json.obj("id").str
val step = json.obj("step").str
val timestamp = dateFormatter.parse(json.obj("timestamp").str)
val status = json.obj("status").str
val message = json.obj("message").str
val headers = consumerRecord.headers()
val recordSet = new String(headers.lastHeader("recordSetId").value())
val institution = new String(headers.lastHeader("institutionId").value())
val session = new String(headers.lastHeader("sessionId").value())
Report(consumerRecord.key, id, step, timestamp, status, message, recordSet, institution, session)
}
implicit val reader: Reader[Report] = macroR
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment