Unverified Commit cd69142f authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

unify reporting

parent 9911f417
Pipeline #19113 passed with stages
in 9 minutes and 7 seconds
......@@ -19,13 +19,12 @@
package ch.memobase
import java.time.Duration
import ch.memobase.models._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging
import org.memobase.settings.SettingsLoader
import java.time.Duration
import scala.collection.JavaConverters._
......@@ -61,23 +60,25 @@ object App extends scala.App with Logging with RecordUtils {
consumer.subscribe(List(settings.getInputTopic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs)).asScala
for {
record <- records
processed <- recordProcessor.process(record)
} yield (record, processed) match {
case (rec, ProcessSuccess(id, msg)) =>
logger.info(s"$id: $msg")
reporter.send(ReportingObject(id, ProcessingSuccess, msg, getInstitutionAndRecordSet(rec.value)))
case (rec, ProcessIgnore(id, msg)) =>
logger.info(s"$id: $msg")
reporter.send(ReportingObject(id, ProcessingIgnore, msg, getInstitutionAndRecordSet(rec.value)))
case (rec, ProcessWarning(id, msg)) =>
logger.warn(s"$id: $msg")
reporter.send(ReportingObject(id, ProcessingWarning, msg, getInstitutionAndRecordSet(rec.value)))
case (rec, ProcessFatal(id, msg, ex)) =>
logger.error(s"$id: $msg: ${ex.getMessage}")
logger.debug(ex.getStackTrace.mkString("\n"))
reporter.send(ReportingObject(id, ProcessingFatal, s"$msg: ${ex.getMessage}", getInstitutionAndRecordSet(rec.value)))
records.foreach {
record => {
val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match {
case ProcessSuccess(id, resource, msg) =>
logger.info(s"$id: $msg")
ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingSuccess)
case ProcessIgnore(id, resource, msg) =>
logger.info(s"$id: $msg")
ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingIgnore)
case ProcessWarning(id, resource, msg) =>
logger.warn(s"$id: $msg")
ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingWarning)
case ProcessFatal(id, resource, msg, ex) =>
logger.error(s"$id: $msg: ${ex.getMessage}")
logger.debug(ex.getStackTrace.mkString("\n"))
ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal)
})
reporter.send(reportingObject)
}
}
}
} catch {
......
......@@ -19,14 +19,12 @@
package ch.memobase
import java.io.ByteArrayOutputStream
import java.net.URI
import ch.memobase.models._
import org.memobase.fedora.{BinaryResource, FedoraClient, FedoraClientImpl}
import java.io.ByteArrayOutputStream
import java.net.URI
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
import scala.util.Try
/**
......@@ -74,13 +72,6 @@ object FedoraClientWrapper {
new FedoraClientWrapper(fc)
}
private def getMediaFileType(mimeType: String): Try[MimeType] =
Conversions.getMediaFileType(mimeType) match {
case Some(ft) => Success(ft)
case None =>
Failure(new UnmanageableMediaFileType(s"Can't handle files of type $mimeType"))
}
private def copyDataAndCloseResource(binaryResource: BinaryResource): Try[ByteArrayOutputStream] = {
val outputStream: Try[ByteArrayOutputStream] = Try {
val oS = new ByteArrayOutputStream()
......
......@@ -20,23 +20,22 @@
package ch.memobase
import java.io.ByteArrayOutputStream
import java.util.Properties
import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
import java.io.ByteArrayOutputStream
import java.util.Properties
import scala.util.{Failure, Success, Try}
sealed trait ProcessOutcome
case class ProcessSuccess(id: String, msg: String) extends ProcessOutcome
case class ProcessSuccess(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
case class ProcessFatal(id: String, msg: String, ex: Throwable) extends ProcessOutcome
case class ProcessFatal(id: String, resource: MemobaseResource, msg: String, ex: Throwable) extends ProcessOutcome
case class ProcessWarning(id: String, msg: String) extends ProcessOutcome
case class ProcessWarning(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
case class ProcessIgnore(id: String, msg: String) extends ProcessOutcome
case class ProcessIgnore(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
class RecordProcessor(fileHandler: DisseminationCopyHandler,
......@@ -50,10 +49,10 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
case Success(binaryResource) =>
handleBinaryResource(binaryResource, record.key())
case Failure(ex) => List(ex match {
case e: NoLocalBinary => ProcessIgnore(record.key(), e.getMessage)
case e: NoDigitalObject => ProcessIgnore(record.key(), e.getMessage)
case e: UnmanageableMediaFileType => ProcessWarning(record.key(), e.getMessage)
case e: Exception => ProcessFatal(record.key(), e.getMessage, e)
case e: NoLocalBinary => ProcessIgnore(record.key(), e.resource, e.getMessage)
case e: NoDigitalObject => ProcessIgnore(record.key(), DigitalObject, e.getMessage)
case e: UnmanageableMediaFileType => ProcessWarning(record.key(), e.resource, e.getMessage)
case e: Exception => ProcessFatal(record.key(), Record, e.getMessage, e)
})
}
}
......@@ -61,70 +60,71 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
case Success(_) if binaryResource.eventType == Delete =>
deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.instantiationType)
deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.resource)
case Success(data) =>
createResource(
binaryResource.id,
binaryResource.mimeType,
binaryResource.instantiationType,
binaryResource.resource,
data)
case Failure(ex) => List(ProcessFatal(recordKey, s"Failed to retrieve binary from Fedora on ${binaryResource.filePath}", ex))
case Failure(ex) => List(ProcessFatal(recordKey, binaryResource.resource,
s"Failed to retrieve binary from Fedora on ${binaryResource.filePath}", ex))
}
}
private def createOutcome(res: Try[Boolean], id: String, destFile: String): List[ProcessOutcome] = List(res match {
case Success(true) => ProcessSuccess(id, s"Updating of file $destFile successful")
case Success(false) => ProcessSuccess(id, s"Creation of file $destFile successful")
case Failure(ex) => ProcessFatal(id, s"Creation of file $destFile failed", ex)
private def createOutcome(res: Try[Boolean], id: String, resource: MemobaseResource, destFile: String): List[ProcessOutcome] = List(res match {
case Success(true) => ProcessSuccess(id, resource, s"Updating of file $destFile successful")
case Success(false) => ProcessSuccess(id, resource, s"Creation of file $destFile successful")
case Failure(ex) => ProcessFatal(id, resource, s"Creation of file $destFile failed", ex)
})
private def deleteOutcome(res: Try[Boolean], id: String, destFile: String): List[ProcessOutcome] = List(res match {
case Success(true) => ProcessSuccess(id, s"Deletion of file $destFile successful")
case Success(false) => ProcessSuccess(id, s"No deletion of file $destFile because object does not exist")
case Failure(ex) => ProcessFatal(id, s"Deletion of file $destFile failed", ex)
private def deleteOutcome(res: Try[Boolean], id: String, resource: MemobaseResource, destFile: String): List[ProcessOutcome] = List(res match {
case Success(true) => ProcessSuccess(id, resource, s"Deletion of file $destFile successful")
case Success(false) => ProcessSuccess(id, resource, s"No deletion of file $destFile because object does not exist")
case Failure(ex) => ProcessFatal(id, resource, s"Deletion of file $destFile failed", ex)
})
private def deleteResource(id: String,
mimeType: MimeType,
instantiationType: Instantiation): List[ProcessOutcome] = mimeType match {
resource: MemobaseResource): List[ProcessOutcome] = mimeType match {
case _: AudioFile =>
List(audioFilePath(id), audioSnippetPath(id))
.map(path => (fileHandler.deleteAudioCopy(path), path))
.flatMap(x => deleteOutcome(x._1, id, x._2))
List((audioFilePath(id), DigitalObject), (audioSnippetPath(id), AudioSnippet))
.map(obj => (fileHandler.deleteAudioCopy(obj._1), obj._2, obj._1))
.flatMap(x => deleteOutcome(x._1, id, x._2, x._3))
case mT: VideoFile =>
val destFile = videoFilePath(id, mT)
val res = fileHandler.deleteVideoCopy(destFile)
deleteOutcome(res, id, destFile)
case _: ImageFile if instantiationType == DigitalObject =>
deleteOutcome(res, id, DigitalObject, destFile)
case _: ImageFile if resource == DigitalObject =>
val destFile = imageFilePath(id)
val res = fileHandler.deleteImageCopy(destFile)
deleteOutcome(res, id, destFile)
case _: ImageFile if instantiationType == Thumbnail =>
deleteOutcome(res, id, DigitalObject, destFile)
case _: ImageFile if resource == Thumbnail =>
val destFile = videoPosterPath(id)
val res = fileHandler.deleteImageCopy(destFile)
deleteOutcome(res, id, destFile)
deleteOutcome(res, id, Thumbnail, destFile)
}
private def createResource(id: String,
mimeType: MimeType,
instantiationType: Instantiation,
resource: MemobaseResource,
data: ByteArrayOutputStream): List[ProcessOutcome] = mimeType match {
case mT: AudioFile =>
List((audioFilePath(id), false), (audioSnippetPath(id), true))
.map(path => (fileHandler.createAudioCopy(data, path._1, mT, path._2), path._1))
.flatMap(x => createOutcome(x._1, id, x._2))
case mT: VideoFile =>
val destFile = videoFilePath(id, mT)
val res = fileHandler.createVideoCopy(data, destFile, mT)
createOutcome(res, id, destFile)
case mT: ImageFile if instantiationType == DigitalObject =>
val destFile = imageFilePath(id)
val res = fileHandler.createImageCopy(data, destFile, mT)
createOutcome(res, id, destFile)
case mT: ImageFile if instantiationType == Thumbnail =>
val destFile = videoPosterPath(id)
val res = fileHandler.createImageCopy(data, destFile, mT)
createOutcome(res, id, destFile)
case mT: AudioFile =>
List((audioFilePath(id), false, DigitalObject), (audioSnippetPath(id), true, AudioSnippet))
.map(path => (fileHandler.createAudioCopy(data, path._1, mT, path._2), path._1, path._3))
.flatMap(x => createOutcome(x._1, id, x._3, x._2))
case mT: VideoFile =>
val destFile = videoFilePath(id, mT)
val res = fileHandler.createVideoCopy(data, destFile, mT)
createOutcome(res, id, DigitalObject, destFile)
case mT: ImageFile if resource == DigitalObject =>
val destFile = imageFilePath(id)
val res = fileHandler.createImageCopy(data, destFile, mT)
createOutcome(res, id, DigitalObject, destFile)
case mT: ImageFile if resource == Thumbnail =>
val destFile = videoPosterPath(id)
val res = fileHandler.createImageCopy(data, destFile, mT)
createOutcome(res, id, Thumbnail, destFile)
}
}
......@@ -20,9 +20,8 @@
package ch.memobase
import ch.memobase.models._
import ujson.{Obj, Str, Value}
import ujson.{Str, Value}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.{Success, Try}
......@@ -32,12 +31,6 @@ trait RecordUtils {
ujson.read(msg).obj("@graph").arr
}
protected def getInstitutionAndRecordSet(msg: String): (Option[String], Option[String]) =
getJsonldGraph(msg).collectFirst {
case res: Obj if isRecord(res) =>
(getInstitution(res.obj), getRecordSet(res.obj))
}.getOrElse((None, None))
private def chooseEventType(eventAsString: String): Event = eventAsString match {
case "Create" => Create
case "Update" => Update
......@@ -47,13 +40,13 @@ trait RecordUtils {
protected def isDigitalObject(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "type") {
Instantiation(_) == DigitalObject
MemobaseResource(_) == DigitalObject
}
}
protected def isPreviewImage(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "type") {
Instantiation(_) == Thumbnail
MemobaseResource(_) == Thumbnail
}
}
......@@ -83,13 +76,6 @@ trait RecordUtils {
}
}
private def getInstitution(record: mutable.LinkedHashMap[String, Value]): Option[String] =
record.obj.get("heldBy").flatMap(v => Some(v.str.split("/").last))
private def getRecordSet(record: mutable.LinkedHashMap[String, Value]): Option[String] =
record.obj.get("isPartOf").flatMap(v => Some(v.str.split("/").last))
private def isRecord(obj: ujson.Obj): Boolean = {
hasKeyValue(obj, "@type") {
_ == "https://www.ica.org/standards/RiC/ontology#Record"
......
......@@ -39,8 +39,8 @@ class Reporter(props: Properties, reportingTopic: String) {
private val producer = new KafkaProducer[String, String](props)
def send(report: models.ReportingObject): JavaFuture[RecordMetadata] = {
val headers = createHeaders(report.institutionRecordSet._2.getOrElse(""), report.institutionRecordSet._1.getOrElse(""))
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString, headers)
// val headers = createHeaders(report.institutionRecordSet._2.getOrElse(""), report.institutionRecordSet._1.getOrElse(""))
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString)
producer.send(producerRecord)
}
......
......@@ -20,25 +20,25 @@
package ch.memobase.models
import ch.memobase.RecordUtils
import ujson.{Str, Value}
import ujson.Value
import scala.collection.mutable.ArrayBuffer
import scala.util.{Success, Try}
import scala.util.Try
/**
* Essential information on a binary file residing in Fedora
*
* @param id Identifier of the binary file
* @param filePath File path (URL) to resource
* @param mimeType MIME type
* @param instantiationType Type of instantiation
* @param eventType Action to be performed on the copy of the file
* @param id Identifier of the binary file
* @param filePath File path (URL) to resource
* @param mimeType MIME type
* @param resource Type of instantiation
* @param eventType Action to be performed on the copy of the file
*/
case class BinaryResourceMetadata(id: String,
filePath: String,
mimeType: MimeType,
instantiationType: Instantiation,
resource: MemobaseResource,
eventType: Event) {
}
......@@ -64,23 +64,31 @@ object BinaryResourceMetadata extends RecordUtils {
}
.map { v => {
val id = v.obj.getOrElse("@id", Value("<unknown id>")).str
Try (
Try(
v.obj match {
case v if isLocalRecord(v, baseUrl) && isProcessableMimeType(v) =>
getEventType(jsonldGraph) match {
case Some(UnknownEvent(e)) => throw new UnknownEventType(s"Event type `$e` for $id not known")
case Some(eventType) =>
val instantiation = Instantiation(v("type").str)
val instantiation = MemobaseResource(v("type").str)
BinaryResourceMetadata(
v("@id").str.substring(s"$baseUrl/digital/".length - 1),
v("locator").str,
if (instantiation == Thumbnail) {JpegFile} else {Conversions.getMediaFileType(v("hasMimeType").str).get},
if (instantiation == Thumbnail) {
JpegFile
} else {
Conversions.getMediaFileType(v("hasMimeType").str).get
},
instantiation,
eventType)
case None => throw new NoEventType(id)
}
case v if isLocalRecord(v, baseUrl) => throw new UnmanageableMediaFileType(s"Media file type for $id unknown")
case _ => throw new NoLocalBinary(id)
case v if isLocalRecord(v, baseUrl) =>
val resource = MemobaseResource(v("type").str)
throw new UnmanageableMediaFileType(s"Media file type for $id unknown", resource)
case v =>
val resource = MemobaseResource(v("type").str)
throw new NoLocalBinary(id, resource)
}
)
......
......@@ -25,11 +25,11 @@ package ch.memobase.models
* @param msg dedicated error message
*/
//noinspection ScalaFileName
class UnmanageableMediaFileType(msg: String) extends Exception(msg)
class UnmanageableMediaFileType(msg: String, val resource: MemobaseResource) extends Exception(msg)
class NoDigitalObject extends Exception("No digital object found")
class NoLocalBinary(id: String) extends Exception(s"No reference to local binary in $id found")
class NoLocalBinary(id: String, val resource: MemobaseResource) extends Exception(s"No reference to local binary in $id found")
class UnknownEventType(msg: String) extends Exception(msg)
......
......@@ -22,37 +22,50 @@ package ch.memobase.models
/**
* Represents an instantiation type as defined in the Memobase internal data model
*/
sealed trait Instantiation
sealed trait MemobaseResource
/**
* Represents a digital object
*/
case object DigitalObject extends Instantiation
case object DigitalObject extends MemobaseResource
/**
* Represents a physical object
*/
case object PhysicalObject extends Instantiation
case object PhysicalObject extends MemobaseResource
/**
* Represents a thumbnail / preview image / poster
*/
case object Thumbnail extends Instantiation
case object Thumbnail extends MemobaseResource
/**
* Represents a record
*/
case object Record extends MemobaseResource
/**
* Represents a audio snippet. This instantiation doesn't
* exist in the data model, however it is used internally
* by the service to differentiate between several derived
* types.
*/
case object AudioSnippet extends MemobaseResource
/**
* Unknown instantiation
*/
case object UnknownInstantiation extends Instantiation
case object UnknownResource extends MemobaseResource
/**
* Helper functions for Instantiation type
*/
object Instantiation {
def apply(i: String): Instantiation = i match {
object MemobaseResource {
def apply(i: String): MemobaseResource = i match {
case "digitalObject" => DigitalObject
case "thumbnail" => Thumbnail
case "physicalObject" => PhysicalObject
case _ => UnknownInstantiation
case _ => UnknownResource
}
}
\ No newline at end of file
......@@ -44,9 +44,12 @@ case object ProcessingFatal extends ProcessingStatus {
}
case class ReportingObject(id: String, status: ProcessingStatus, message: String, institutionRecordSet: (Option[String], Option[String])) {
case class ReportingObject(id: String,
digitalObject: Option[(ProcessingStatus, String)] = None,
poster: Option[(ProcessingStatus, String)] = None,
audioSnippet: Option[(ProcessingStatus, String)] = None) {
import ReportingObject.createTimestamp
import ReportingObject._
override def toString: String =
ujson.write(
......@@ -54,8 +57,8 @@ case class ReportingObject(id: String, status: ProcessingStatus, message: String
("step", "media-metadata-extractor"),
("timestamp", createTimestamp),
("id", id),
("status", status.value),
("message", message)
("status", mergeStatus(digitalObject, poster, audioSnippet)),
("message", mergeMessages(digitalObject, poster, audioSnippet))
)
)
}
......@@ -64,4 +67,35 @@ object ReportingObject {
private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS")
def createTimestamp: String = dateFormatter.format(Calendar.getInstance().getTime)
}
def mergeStatus(digitalObject: Option[(ProcessingStatus, String)],
poster: Option[(ProcessingStatus, String)],
audioSnippet: Option[(ProcessingStatus, String)]): String = {
val processStatus = List(digitalObject, poster, audioSnippet)
.flatMap(obj => obj.flatMap(o => Some(o._1)))
if (processStatus.contains(ProcessingFatal)) {
ProcessingFatal.value
} else if (processStatus.contains(ProcessingWarning)) {
ProcessingWarning.value
} else if (processStatus.contains(ProcessingSuccess)) {
ProcessingSuccess.value
} else {
ProcessingIgnore.value
}
}
def mergeMessages(digitalObject: Option[(ProcessingStatus, String)],
poster: Option[(ProcessingStatus, String)],
audioSnippet: Option[(ProcessingStatus, String)]): String =
s"""DIGITAL OBJECT: ${digitalObject.flatMap(dO => Some(dO._2)).getOrElse("not available")}
| -- POSTER: ${poster.flatMap(p => Some(p._2)).getOrElse("not available")}
| -- AUDIO SNIPPET:${audioSnippet.flatMap(aS => Some(aS._2)).getOrElse("not available")}""".stripMargin
def addResourceOutcome(report: ReportingObject, resource: (MemobaseResource, String), status: ProcessingStatus): ReportingObject =
resource._1 match {
case DigitalObject => report.copy(digitalObject = Some(status, resource._2))
case Thumbnail => report.copy(poster = Some(status, resource._2))
case AudioSnippet => report.copy(audioSnippet = Some(status, resource._2))
case _ => report
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment