/* * Import Process Delete * Copyright (C) 2021 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase.models import org.apache.kafka.clients.consumer.ConsumerRecord import java.text.SimpleDateFormat import java.util.Date import scala.util.Try case class Report(msgKey: String, recordId: String, step: String, timestamp: Date, status: String, message: String, recordSetId: String, institutionId: String, sessionId: String) object Report { private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS") private val shortDateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss") def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try { val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!")) val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj")) if (step != "normalization-service") { throw new ParserIgnore("No normalization-service message") } val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj")) if (status != "SUCCESS") { throw new ParserIgnore("No SUCCESS message") } val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj")) val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj")) val timestamp = Try(dateFormatter.parse(timestampString)) .orElse(Try(shortDateFormatter.parse(timestampString))) .getOrElse(throw new ParserException("No valid timestamp")) val message = Try(json.obj("message").str).getOrElse(throw new ParserException("No `message` field in JSON obj")) val headers = consumerRecord.headers() val recordSet = Try(new String(headers.lastHeader("recordSetId").value())) .getOrElse(throw new ParserException("Extraction of `recordSetId` header field failed")) val institution = Try(new String(headers.lastHeader("institutionId").value())) .getOrElse(throw new ParserException("Extraction of `institutionId` header field failed")) val session = Try(new String(headers.lastHeader("sessionId").value())) .getOrElse(throw new ParserException("Extraction of `sessionId` header field failed")) Report(consumerRecord.key, id, step, timestamp, status, message, recordSet, institution, session) } }