Unverified Commit 79be74b3 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

write recordSetId and institution to report

parent 9735720b
Pipeline #16131 passed with stages
in 11 minutes and 31 seconds
......@@ -29,7 +29,7 @@ import org.memobase.settings.SettingsLoader
import scala.collection.JavaConverters._
object App extends scala.App with Logging {
object App extends scala.App with Logging with RecordUtils {
val settings = new SettingsLoader(List(
"audioSnippetDuration",
"internalBaseUrl",
......@@ -63,17 +63,17 @@ object App extends scala.App with Logging {
for {
record <- records
processed <- recordProcessor.process(record)
} yield processed match {
case ProcessSuccess(id, msg) =>
} yield (record, processed) match {
case (rec, ProcessSuccess(id, msg)) =>
logger.debug(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
case ProcessWarn(id, msg) =>
reporter.send(Report(id, ProcessingIsSuccess, msg, getInstitutionAndRecordSet(rec.value)))
case (rec, ProcessWarn(id, msg)) =>
logger.warn(msg)
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessFailure(id, msg, ex) =>
reporter.send(Report(id, ProcessingIsFailure, msg, getInstitutionAndRecordSet(rec.value)))
case (rec, ProcessFailure(id, msg, ex)) =>
logger.error(msg)
logger.debug(ex.getStackTrace.mkString("\n"))
reporter.send(Report(id, ProcessingIsFailure, msg))
reporter.send(Report(id, ProcessingIsFailure, msg, getInstitutionAndRecordSet((rec.value))))
}
}
} catch {
......@@ -86,3 +86,4 @@ object App extends scala.App with Logging {
reporter.close()
}
}
......@@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.util.{Failure, Success, Try}
trait ProcessOutcome
sealed trait ProcessOutcome
case class ProcessSuccess(id: String, msg: String) extends ProcessOutcome
......
/*
* Media Converter
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import ch.memobase.models._
import ujson.{Obj, Str, Value}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.{Success, Try}
trait RecordUtils {
protected def getJsonldGraph(msg: String): ArrayBuffer[Value] = {
ujson.read(msg).obj("@graph").arr
}
protected def getInstitutionAndRecordSet(msg: String): (Option[String], Option[String]) =
getJsonldGraph(msg).collectFirst {
case res: Obj if isRecord(res) =>
(getInstitution(res.obj), getRecordSet(res.obj))
}.getOrElse((None, None))
private def chooseEventType(eventAsString: String): Event = eventAsString match {
case "Create" => Create
case "Update" => Update
case "Delete" => Delete
case s => UnknownEvent(s)
}
protected def isDigitalObject(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "type") {
Instantiation(_) == DigitalObject
}
}
protected def isPreviewImage(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "type") {
Instantiation(_) == Thumbnail
}
}
protected def isProcessableMimeType(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "hasMimeType") {
value => Conversions.getMediaFileType(value).isDefined
}
}
protected def isLocalRecord(obj: ujson.Obj, externalBaseUrl: String): Boolean = {
hasKeyValue(obj, "locator") {
value => value.startsWith(externalBaseUrl)
}
}
private def hasKeyValue(obj: ujson.Obj, key: String)(valueFun: String => Boolean): Boolean = {
Try(obj.value(key)) match {
case Success(id: Str) => valueFun(id.value)
case _ => false
}
}
protected def getEventType(objList: ArrayBuffer[Value]): Option[Event] = {
objList
.collectFirst {
case v if isRecord(v.obj) && v.obj.contains("eventType") => chooseEventType(v.obj("eventType").str)
}
}
private def getInstitution(record: mutable.LinkedHashMap[String, Value]): Option[String] =
record.obj.get("heldBy").flatMap(v => Some(v.str.split("/").last))
private def getRecordSet(record: mutable.LinkedHashMap[String, Value]): Option[String] =
record.obj.get("isPartOf").flatMap(v => Some(v.str.split("/").last))
private def isRecord(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "@type") {
_ == "https://www.ica.org/standards/RiC/ontology#Record"
}
}
}
......@@ -23,6 +23,9 @@ import java.util.Properties
import java.util.concurrent.{Future => JavaFuture}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.header.Header
import scala.collection.JavaConverters._
/**
......@@ -36,7 +39,24 @@ class Reporter(props: Properties, reportingTopic: String) {
private val producer = new KafkaProducer[String, String](props)
def send(report: models.Report): JavaFuture[RecordMetadata] = {
producer.send(new ProducerRecord(reportingTopic, report.id, report.toString))
val headers = createHeaders(report.institutionRecordSet._2.getOrElse(""), report.institutionRecordSet._1.getOrElse(""))
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString, headers)
producer.send(producerRecord)
}
private def createHeaders(recordSetId: String, institutionId: String): java.lang.Iterable[Header] = {
List(
new Header {
override def key(): String = "recordSetId"
override def value(): Array[Byte] = recordSetId.getBytes
},
new Header {
override def key(): String = "institutionId"
override def value(): Array[Byte] = institutionId.getBytes
}
).asJava
}
def close(): Unit = producer.close()
......
......@@ -19,6 +19,7 @@
package ch.memobase.models
import ch.memobase.RecordUtils
import ujson.{Str, Value}
import scala.collection.mutable.ArrayBuffer
......@@ -41,7 +42,7 @@ case class BinaryResourceMetadata(id: String,
eventType: Event) {
}
object BinaryResourceMetadata {
object BinaryResourceMetadata extends RecordUtils {
/**
* Builds a `BinaryResourceMetadata` object from a JSON-LD object pulled from Kafka topic
......@@ -51,17 +52,10 @@ object BinaryResourceMetadata {
* @return
*/
def build(msg: String, externalBaseUrl: String): List[Try[BinaryResourceMetadata]] = {
val jsonldGraph = ujson.read(msg).obj("@graph").arr
val jsonldGraph = getJsonldGraph(msg)
extractBinaryResourceMetadata(jsonldGraph, externalBaseUrl)
}
private def chooseEventType(eventAsString: String): Event = eventAsString match {
case "Create" => Create
case "Update" => Update
case "Delete" => Delete
case s => UnknownEvent(s)
}
//noinspection ScalaStyle
private def extractBinaryResourceMetadata(jsonldGraph: ArrayBuffer[Value], baseUrl: String): List[Try[BinaryResourceMetadata]] =
jsonldGraph.value
......@@ -92,50 +86,4 @@ object BinaryResourceMetadata {
}
}.toList
private def isDigitalObject(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "type") {
Instantiation(_) == DigitalObject
}
}
private def isPreviewImage(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "type") {
Instantiation(_) == Thumbnail
}
}
private def isRecord(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "@type") {
_ == "https://www.ica.org/standards/RiC/ontology#Record"
}
}
private def isProcessableMimeType(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "hasMimeType") {
value => Conversions.getMediaFileType(value).isDefined
}
}
private def isLocalRecord(obj: ujson.Obj, externalBaseUrl: String): Boolean = {
hasKeyValue(obj, "locator") {
value => value.startsWith(externalBaseUrl)
}
}
private def hasKeyValue(obj: ujson.Obj, key: String)(valueFun: String => Boolean): Boolean = {
Try(obj.value(key)) match {
case Success(id: Str) => valueFun(id.value)
case _ => false
}
}
private def getEventType(objList: ArrayBuffer[Value]): Option[Event] = {
objList
.collectFirst {
case v if isRecord(v.obj) && v.obj.contains("eventType") => chooseEventType(v.obj("eventType").str)
}
}
}
......@@ -27,7 +27,7 @@ case object ProcessingIsSuccess extends ProcessingStatus
case object ProcessingIsFailure extends ProcessingStatus
case class Report(id: String, status: ProcessingStatus, message: String) {
case class Report(id: String, status: ProcessingStatus, message: String, institutionRecordSet: (Option[String], Option[String])) {
override def toString: String =
s"""{"id": "$id", "status": "${
status match {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment