KProducer.scala 2.26 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
2
/*
 * Import Process Delete
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
 * Copyright (C) 2021 Memoriav
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
19
20
21
22
23
24
package ch.memobase

import ch.memobase.models.DeleteMessage
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.logging.log4j.scala.Logging

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
25
26
import java.time.LocalDateTime

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
27
28
29
30
31
32

abstract class KProducer {
  self: AppSettings with Logging =>

  private lazy val producer = new KafkaProducer[String, String](producerProps)

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
33
34
35
  def sendDelete(msg: DeleteMessage, dryRun: Boolean): Unit = {
    logger.debug(s"Sending delete command for ${msg.recordId}")
    val producerRecord = new ProducerRecord[String, String](outputTopic, msg.recordId, null)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
36
    producerRecord.headers()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
37
38
39
      .add("recordSetId", msg.recordSetId.getBytes())
      .add("institutionId", msg.institutionId.getBytes())
      .add("sessionId", msg.sessionId.getBytes())
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40
      .add("dryRun", List((if (dryRun) 1 else 0).toByte).toArray)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
41
42
43
    producer.send(producerRecord)
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
  def sendTransaction(msg: DeleteMessage): Unit = {
    import ujson._
    logger.debug(s"Log transaction for ${msg.recordId}")
    val value = Obj(
      "id" -> msg.recordId,
      "action" -> "DELETE",
      "timestamp" -> LocalDateTime.now().toString()
    ).toString
    val producerRecord = new ProducerRecord[String, String](outputTopic, msg.recordId, value)
    producerRecord.headers()
      .add("recordSetId", msg.recordSetId.getBytes())
      .add("institutionId", msg.institutionId.getBytes())
      .add("sessionId", msg.sessionId.getBytes())
    producer.send(producerRecord)
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
60
61
62
63
64
  def closeProducer(): Unit = {
    logger.debug("Closing Kafka producer instance")
    producer.close()
  }
}