Unverified Commit 6e500c87 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

log deletes

parent 46ecf354
Pipeline #22528 passed with stages
in 5 minutes and 30 seconds
...@@ -59,7 +59,12 @@ object App ...@@ -59,7 +59,12 @@ object App
} }
}).toSet }).toSet
logger.info(s"${matches.size} matches found") logger.info(s"${matches.size} matches found")
matches.foreach(msg => sendDelete(msg, dryRun)) matches.foreach(msg => {
sendDelete(msg, dryRun)
if (!dryRun) {
sendTransaction(msg)
}
})
} catch { } catch {
case e: Exception => case e: Exception =>
logger.error(e) logger.error(e)
......
...@@ -22,23 +22,41 @@ import ch.memobase.models.DeleteMessage ...@@ -22,23 +22,41 @@ import ch.memobase.models.DeleteMessage
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.logging.log4j.scala.Logging import org.apache.logging.log4j.scala.Logging
import java.time.LocalDateTime
abstract class KProducer { abstract class KProducer {
self: AppSettings with Logging => self: AppSettings with Logging =>
private lazy val producer = new KafkaProducer[String, String](producerProps) private lazy val producer = new KafkaProducer[String, String](producerProps)
def sendDelete(id: DeleteMessage, dryRun: Boolean): Unit = { def sendDelete(msg: DeleteMessage, dryRun: Boolean): Unit = {
logger.debug(s"Sending delete command for ${id.recordId}") logger.debug(s"Sending delete command for ${msg.recordId}")
val producerRecord = new ProducerRecord[String, String](outputTopic, id.recordId, null) val producerRecord = new ProducerRecord[String, String](outputTopic, msg.recordId, null)
producerRecord.headers() producerRecord.headers()
.add("recordSetId", id.recordSetId.getBytes()) .add("recordSetId", msg.recordSetId.getBytes())
.add("institutionId", id.institutionId.getBytes()) .add("institutionId", msg.institutionId.getBytes())
.add("sessionId", id.sessionId.getBytes()) .add("sessionId", msg.sessionId.getBytes())
.add("dryRun", List((if (dryRun) 1 else 0).toByte).toArray) .add("dryRun", List((if (dryRun) 1 else 0).toByte).toArray)
producer.send(producerRecord) producer.send(producerRecord)
} }
def sendTransaction(msg: DeleteMessage): Unit = {
import ujson._
logger.debug(s"Log transaction for ${msg.recordId}")
val value = Obj(
"id" -> msg.recordId,
"action" -> "DELETE",
"timestamp" -> LocalDateTime.now().toString()
).toString
val producerRecord = new ProducerRecord[String, String](outputTopic, msg.recordId, value)
producerRecord.headers()
.add("recordSetId", msg.recordSetId.getBytes())
.add("institutionId", msg.institutionId.getBytes())
.add("sessionId", msg.sessionId.getBytes())
producer.send(producerRecord)
}
def closeProducer(): Unit = { def closeProducer(): Unit = {
logger.debug("Closing Kafka producer instance") logger.debug("Closing Kafka producer instance")
producer.close() producer.close()
......
...@@ -37,6 +37,10 @@ object Report { ...@@ -37,6 +37,10 @@ object Report {
def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try { def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!")) val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!"))
val action = Try(json.obj("action").str).getOrElse(throw new ParserException("No `action` field in JSON obj"))
if (action == "DELETE") {
throw new ParserIgnore("`DELETE` transactions are ignored")
}
val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj")) val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj"))
val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj")) val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj"))
val timestamp = Try(dateFormatter.parse(timestampString)) val timestamp = Try(dateFormatter.parse(timestampString))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment