Unverified Commit d53895cb authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

use import-process-bridge reports


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent eae23e26
Pipeline #22374 passed with stages
in 5 minutes and 15 seconds
...@@ -35,9 +35,9 @@ spec: ...@@ -35,9 +35,9 @@ spec:
- name: CLIENT_ID - name: CLIENT_ID
value: import-process-delete value: import-process-delete
- name: TOPIC_IN - name: TOPIC_IN
value: import-process-reporting value: postprocessing-reporting
- name: TOPIC_OUT - name: TOPIC_OUT
value: import-process-test value: import-process-test-delete
- name: POLL_TIMEOUT - name: POLL_TIMEOUT
value: "60000" value: "60000"
restartPolicy: Never restartPolicy: Never
......
...@@ -21,7 +21,6 @@ package ch.memobase ...@@ -21,7 +21,6 @@ package ch.memobase
import ch.memobase.models.{DeleteMessage, ParserIgnore, Report} import ch.memobase.models.{DeleteMessage, ParserIgnore, Report}
import org.apache.logging.log4j.scala.Logging import org.apache.logging.log4j.scala.Logging
import scala.collection.mutable
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
object App object App
...@@ -36,7 +35,6 @@ object App ...@@ -36,7 +35,6 @@ object App
parse(args) match { parse(args) match {
case Some((sessionId, filters, dryRun)) => case Some((sessionId, filters, dryRun)) =>
try { try {
val recordSetIds: mutable.HashSet[String] = mutable.HashSet()
val matches = Iterator.continually(poll) val matches = Iterator.continually(poll)
.takeWhile(_.nonEmpty) .takeWhile(_.nonEmpty)
.flatten .flatten
...@@ -50,10 +48,6 @@ object App ...@@ -50,10 +48,6 @@ object App
logger.info("Match found") logger.info("Match found")
List(DeleteMessage(rep, sessionId)) List(DeleteMessage(rep, sessionId))
case Success(r) => case Success(r) =>
if (!recordSetIds.contains(r.recordSetId)) {
recordSetIds += r.recordSetId
logger.info(r.recordSetId)
}
List() List()
case Failure(_: ParserIgnore) => case Failure(_: ParserIgnore) =>
logger.debug("Ignoring irrelevant message") logger.debug("Ignoring irrelevant message")
......
...@@ -41,12 +41,12 @@ object Report { ...@@ -41,12 +41,12 @@ object Report {
def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try { def apply(consumerRecord: ConsumerRecord[String, String]): Try[Report] = Try {
val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!")) val json = Try(ujson.read(consumerRecord.value())).getOrElse(throw new ParserException("JSON is not valid!"))
val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj")) val step = Try(json.obj("step").str).getOrElse(throw new ParserException("No `step` field in JSON obj"))
if (step != "normalization-service") { if (step != "import-process-bridge") {
throw new ParserIgnore("No normalization-service message") throw new ParserIgnore("No normalization-service message")
} }
val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj")) val status = Try(json.obj("status").str).getOrElse(throw new ParserException("No `status` field in JSON obj"))
if (status != "SUCCESS") { if (status == "FATAL") {
throw new ParserIgnore("No SUCCESS message") throw new ParserIgnore("FATAL message: No record forwarded, therefore ignoring report")
} }
val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj")) val id = Try(json.obj("id").str).getOrElse(throw new ParserException("No `id` field in JSON obj"))
val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj")) val timestampString = Try(json.obj("timestamp").str).getOrElse(throw new ParserException("No `timestamp` field in JSON obj"))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment