Report.scala 3.2 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
2
/*
 * Import Process Delete
3
 * Copyright (C) 2021 Memoriav
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
19
20
package ch.memobase.models

21
22
import org.apache.kafka.clients.consumer.ConsumerRecord

23
24
import java.text.SimpleDateFormat
import java.util.Date
25
26
27
28
import scala.util.Try

case class Report(msgKey: String,
                  recordId: String,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29
                  step: String,
30
                  timestamp: Date,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
31
                  status: String,
32
33
34
35
                  message: String,
                  recordSetId: String,
                  institutionId: String,
                  sessionId: String)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
36
37

object Report {
38
  private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS")
39
  private val shortDateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss")
40
41

  def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
42
43
    val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!"))
    val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj"))
44
    if (step != "import-process-bridge") {
45
      throw new ParserIgnore("No normalization-service message")
46
47
    }
    val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj"))
48
49
    if (status == "FATAL") {
      throw new ParserIgnore("FATAL message: No record forwarded, therefore ignoring report")
50
51
    }
    val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj"))
52
    val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj"))
53
54
55
    val timestamp = Try(dateFormatter.parse(timestampString))
      .orElse(Try(shortDateFormatter.parse(timestampString)))
      .getOrElse(throw new ParserException("No valid timestamp"))
56
    val message = Try(json.obj("message").str).getOrElse(throw new ParserException("No `message` field in JSON obj"))
57
    val headers = consumerRecord.headers()
58
59
60
61
62
63
    val recordSet = Try(new String(headers.lastHeader("recordSetId").value()))
      .getOrElse(throw new ParserException("Extraction of `recordSetId` header field failed"))
    val institution = Try(new String(headers.lastHeader("institutionId").value()))
      .getOrElse(throw new ParserException("Extraction of `institutionId` header field failed"))
    val session = Try(new String(headers.lastHeader("sessionId").value()))
      .getOrElse(throw new ParserException("Extraction of `sessionId` header field failed"))
64
    Report(consumerRecord.key, id, step, timestamp, status, message, recordSet, institution, session)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
65
66
67
  }
}