Unverified Commit 9109e222 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

more logging; get reports from normalization-service


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 6411b2b5
Pipeline #22290 passed with stages
in 4 minutes and 59 seconds
...@@ -35,12 +35,15 @@ object App ...@@ -35,12 +35,15 @@ object App
parse(args) match { parse(args) match {
case Some((sessionId, filters, dryRun)) => case Some((sessionId, filters, dryRun)) =>
var counter = 0
var matchCounter = 0
try { try {
Stream Stream
.continually(poll) .continually(poll)
.takeWhile(records => records.nonEmpty) .takeWhile(records => records.nonEmpty)
.flatMap(records => { .flatMap(records => {
logger.info("Processing records batch with size ${records.size}") logger.info(s"Processing records batch with size ${records.size}; ${counter} processed so far")
counter += records.size
records records
.flatMap { record => .flatMap { record =>
Report(record) match { Report(record) match {
...@@ -55,6 +58,11 @@ object App ...@@ -55,6 +58,11 @@ object App
} }
} }
.filter(record => filters.forall(f => f(record))) .filter(record => filters.forall(f => f(record)))
.map(record => {
matchCounter += 1
logger.info(s"${matchCounter} matches so far")
record
})
.foldLeft(HashSet[DeleteMessage]())((agg, record) => { .foldLeft(HashSet[DeleteMessage]())((agg, record) => {
logger.info(s"Size of delete messages: ${agg.size}") logger.info(s"Size of delete messages: ${agg.size}")
agg + DeleteMessage(record, sessionId) agg + DeleteMessage(record, sessionId)
......
/* /*
* Import Process Delete * Import Process Delete
* Copyright (C) 2020 Memoriav * Copyright (C) 2021 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by * it under the terms of the GNU Affero General Public License as published by
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
package ch.memobase.models package ch.memobase.models
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import java.text.SimpleDateFormat
import java.util.Date
import scala.util.Try import scala.util.Try
case class Report(msgKey: String, case class Report(msgKey: String,
...@@ -42,8 +41,8 @@ object Report { ...@@ -42,8 +41,8 @@ object Report {
def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try { def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!")) val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!"))
val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj")) val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj"))
if (step != "fedora-ingest") { if (step != "normalization-service") {
throw new ParserIgnore("No fedora-ingest message") throw new ParserIgnore("No normalization-service message")
} }
val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj")) val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj"))
if (status != "SUCCESS") { if (status != "SUCCESS") {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment