Unverified Commit 54e246cd authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

process from transactions


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent b1d1a557
Pipeline #22507 failed with stages
in 2 minutes and 1 second
......@@ -31,9 +31,9 @@ spec:
- name: CLIENT_ID
value: import-process-delete
- name: TOPIC_IN
value: postprocessing-reporting
value: import-process-transactions-records
- name: TOPIC_OUT
value: import-process-test-delete
value: delete-process-deletes
- name: POLL_TIMEOUT
value: "20000"
restartPolicy: Never
......
......@@ -26,10 +26,7 @@ import scala.util.Try
case class Report(msgKey: String,
recordId: String,
step: String,
timestamp: Date,
status: String,
message: String,
recordSetId: String,
institutionId: String,
sessionId: String)
......@@ -40,25 +37,11 @@ object Report {
def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!"))
val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj"))
if (step != "import-process-bridge") {
throw new ParserIgnore("No normalization-service message")
}
val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj"))
if (status == "FATAL") {
throw new ParserIgnore("FATAL message: No record forwarded, therefore ignoring report")
}
val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj"))
if (id.startsWith("https://memobase.ch/recordSet")) {
throw new ParserIgnore("Ignoring report because it is for a recordSet")
} else if (id.startsWith("https://memobase.ch/institution")) {
throw new ParserIgnore("Ignoring report because it is for an institution")
}
val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj"))
val timestamp = Try(dateFormatter.parse(timestampString))
.orElse(Try(shortDateFormatter.parse(timestampString)))
.getOrElse(throw new ParserException("No valid timestamp"))
val message = Try(json.obj("message").str).getOrElse(throw new ParserException("No `message` field in JSON obj"))
val headers = consumerRecord.headers()
val recordSet = Try(new String(headers.lastHeader("recordSetId").value()))
.getOrElse(throw new ParserException("Extraction of `recordSetId` header field failed"))
......@@ -66,7 +49,7 @@ object Report {
.getOrElse(throw new ParserException("Extraction of `institutionId` header field failed"))
val session = Try(new String(headers.lastHeader("sessionId").value()))
.getOrElse(throw new ParserException("Extraction of `sessionId` header field failed"))
Report(consumerRecord.key, id, step, timestamp, status, message, recordSet, institution, session)
Report(consumerRecord.key, id, timestamp, recordSet, institution, session)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment