Unverified Commit 09ef55b4 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor code and extend reporting

parent 48286f37
Pipeline #14600 passed with stages
in 16 minutes and 22 seconds
......@@ -70,32 +70,16 @@ object App extends scala.App with Logging {
record <- records
processed <- recordProcessor.process(record)
} yield processed match {
case ProcessSuccess(id, fT, a) if a == "Delete" =>
val msg = s"Deleting of object $id with type $fT successful"
case ProcessSuccess(id, msg) =>
logger.debug(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
case ProcessSuccess(id, fT, _) =>
val msg = s"Copying of object $id with type $fT successful"
logger.debug(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
case ProcessFailure(id, fT, a, ex) if a == "Delete" =>
val msg = s"Deleting of object $id with type $fT failed: ${ex.getMessage}"
logger.error(msg)
logger.info(ex.getStackTrace.mkString("\n"))
case ProcessWarn(id, msg) =>
logger.warn(msg)
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessFailure(id, fT, _, ex) =>
val msg = s"Copying of object $id with type $fT failed: ${ex.getMessage}"
case ProcessFailure(id, msg, ex) =>
logger.error(msg)
logger.info(ex.getStackTrace.mkString("\n"))
logger.debug(ex.getStackTrace.mkString("\n"))
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessIgnore(id, reason, warn) if warn =>
val msg = s"Ignoring object $id: $reason"
logger.warn(msg)
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessIgnore(id, reason, _) =>
val msg = s"Ignoring object $id: $reason"
logger.info(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
}
}
} catch {
......
......@@ -51,6 +51,21 @@ class DisseminationCopyHandler(audioDestPath: String, imageDestPath: String, vid
}
}
/**
* Removes existing file
*
* @param destFile Path to file
* @return true if removed, false otherwise
*/
private def removeExistingFile(destFile: Path): Boolean = {
if (destFile.toFile.exists()) {
destFile.toFile.delete()
true
} else {
false
}
}
/**
* Creates dissemination copy of audio file
......@@ -58,17 +73,21 @@ class DisseminationCopyHandler(audioDestPath: String, imageDestPath: String, vid
* @param data binary data as [[java.io.ByteArrayOutputStream]] instance
* @param destId Filename of dissemination copy without extension
* @param sourceFileType File type of input data
* @return
* @param isSnippet Process data as snippet
* @return true if both files were overwritten, false otherwise
*/
def createAudioCopy(data: ByteArrayOutputStream, destId: String, sourceFileType: MimeType): Try[Path] = Try {
def createAudioCopy(data: ByteArrayOutputStream, destId: String, sourceFileType: MimeType, isSnippet: Boolean = false): Try[Boolean] = Try {
val tempFilePath = Files.createTempFile("media-", "." + Conversions.getFileTypeExtension(sourceFileType).get)
val destFile = Paths.get(audioDestPath, destId + ".mp4")
val snippetFile = Paths.get(audioDestPath, destId + "-intro." + Conversions.getFileTypeExtension(sourceFileType).get)
val destFile = Paths.get(audioDestPath, destId + (if (isSnippet) ".mp3" else ".mp4"))
writeData(data, tempFilePath)
MediaTransformations.audioToMp4(tempFilePath.toString, destFile.toString).get
MediaTransformations.createAudioSnippet(tempFilePath.toString, snippetFile.toString, audioSnippetDuration)
val copyRemoved = removeExistingFile(destFile)
if (isSnippet) {
MediaTransformations.createAudioSnippet(tempFilePath.toString, destFile.toString, audioSnippetDuration)
} else {
MediaTransformations.audioToMp4(tempFilePath.toString, destFile.toString).get
}
Files.delete(tempFilePath)
destFile
copyRemoved
}
/**
......@@ -77,15 +96,16 @@ class DisseminationCopyHandler(audioDestPath: String, imageDestPath: String, vid
* @param data binary data as [[java.io.ByteArrayOutputStream]] instance
* @param destId Filename of dissemination copy without extension
* @param sourceFileType File type of input data
* @return
* @return true if copy was overwritten, false otherwise
*/
def createImageCopy(data: ByteArrayOutputStream, destId: String, sourceFileType: MimeType): Try[Path] = Try {
def createImageCopy(data: ByteArrayOutputStream, destId: String, sourceFileType: MimeType): Try[Boolean] = Try {
val tempFilePath = Files.createTempFile("media-", "." + Conversions.getFileTypeExtension(sourceFileType).get)
val destFile = Paths.get(imageDestPath, destId + ".jp2")
writeData(data, tempFilePath)
val copyRemoved = removeExistingFile(destFile)
MediaTransformations.imageToJp2(tempFilePath.toString, destFile.toString).get
Files.delete(tempFilePath)
destFile
copyRemoved
}
/**
......@@ -94,30 +114,32 @@ class DisseminationCopyHandler(audioDestPath: String, imageDestPath: String, vid
* @param data binary data as [[java.io.ByteArrayOutputStream]] instance
* @param destId Filename of dissemination copy without extension
* @param sourceFileType File type of input data
* @return
* @return true if copy was overwritten, false otherwise
*/
def createVideoCopy(data: ByteArrayOutputStream, destId: String, sourceFileType: MimeType): Try[Path] = {
def createVideoCopy(data: ByteArrayOutputStream, destId: String, sourceFileType: MimeType): Try[Boolean] = Try {
val destFile = Paths.get(videoDestPath, s"$destId.${Conversions.getFileTypeExtension(sourceFileType).get}")
val copyRemoved = removeExistingFile(destFile)
writeData(data, destFile)
copyRemoved
}
/**
* Deletes dissemination copy of audio file
*
* @param destId Filename of dissemination copy without extension
* @return
* @return true if copy and snippet were deleted successfully, false otherwise
*/
def deleteAudioCopy(destId: String): Try[Boolean] =
Try {
Paths.get(audioDestPath, destId + ".mp4").toFile.delete()
Paths.get(audioDestPath, destId + "-intro.mp3").toFile.delete()
val res = Paths.get(audioDestPath, destId + ".mp4").toFile.delete()
Paths.get(audioDestPath, destId + "-intro.mp3").toFile.delete() && res
}
/**
* Deletes dissemination copy of image file
*
* @param destId Filename of dissemination copy without extension
* @return
* @return true if copy was deleted successfully, false otherwise
*/
def deleteImageCopy(destId: String): Try[Boolean] =
Try {
......@@ -129,7 +151,7 @@ class DisseminationCopyHandler(audioDestPath: String, imageDestPath: String, vid
*
* @param destId Filename of dissemination copy without extension
* @param fileType File type of media
* @return
* @return true if copy was deleted successfully, false otherwise
*/
def deleteVideoCopy(destId: String, fileType: MimeType): Try[Boolean] =
Try {
......
......@@ -47,14 +47,14 @@ object MediaTransformations extends Logging {
* Repacks audio files in a MP4 container and adds a moov atom at the beginning of the file
*
* @param sourceFilePath Path to the source file
* @param destFilePath Path to the final file
* @param destFile Path to the final file
* @return
*/
def audioToMp4(sourceFilePath: String, destFilePath: String): Try[String] = {
val externalCommand = s"ffmpeg -i $sourceFilePath -acodec copy -loglevel warning -hide_banner -y -movflags faststart $destFilePath"
def audioToMp4(sourceFilePath: String, destFile: String): Try[String] = {
val externalCommand = s"ffmpeg -i $sourceFilePath -acodec copy -loglevel warning -hide_banner -y -movflags faststart $destFile"
Try {
executeCommand(externalCommand).get
destFilePath
destFile
}
}
......@@ -62,19 +62,19 @@ object MediaTransformations extends Logging {
* Creates an audio snippet used as a base for producing sonograms
*
* @param sourceFilePath Path to the source file
* @param destFilePath Path to the final file
* @param destFile Path to the final file
* @param duration Duration of snippet (counts from beginning of track)
* @return
*/
def createAudioSnippet(sourceFilePath: String, destFilePath: String, duration: Int): Try[String] = {
def createAudioSnippet(sourceFilePath: String, destFile: String, duration: Int): Try[String] = {
val minutes = (duration / 60) % 60
val hours = (duration / 60 / 60) % 24
val time = "%02d:%02d:%02d".format(hours, minutes, duration % 60)
val copyStream = sourceFilePath.endsWith(".mp3")
val externalCommand = s"ffmpeg -i $sourceFilePath ${if (copyStream) "-acodec copy"} -loglevel warning -hide_banner -y -t $time $destFilePath"
val externalCommand = s"ffmpeg -i $sourceFilePath ${if (copyStream) "-acodec copy"} -loglevel warning -hide_banner -y -t $time $destFile"
Try {
executeCommand(externalCommand)
destFilePath
destFile
}
}
......@@ -82,14 +82,14 @@ object MediaTransformations extends Logging {
* Converts image file to jpeg2000
*
* @param sourceFilePath Path to the source file
* @param destFilePath Path to the final file
* @param destFile Path to the final file
* @return
*/
def imageToJp2(sourceFilePath: String, destFilePath: String): Try[String] = Try {
def imageToJp2(sourceFilePath: String, destFile: String): Try[String] = Try {
val intermediaryFile = Files.createTempFile("image-", ".tif")
val imagemagickCommand = s"convert -format tif -compress none $sourceFilePath ${intermediaryFile.toString}"
// scalastyle:off
val kduCompressCommand = s"""kdu_compress -i ${intermediaryFile.toString} -o $destFilePath -rate 3 -flush_period 1024 -quiet Creversible=no Clevels=6 Clayers=6 Cprecincts={256,256},{256,256},{128,128} Corder=RPCL Cuse_sop=yes Cuse_eph=yes Cblk={64,64} ORGgen_plt=yes ORGtparts=R Stiles={512,512}"""
val kduCompressCommand = s"""kdu_compress -i ${intermediaryFile.toString} -o $destFile -rate 3 -flush_period 1024 -quiet Creversible=no Clevels=6 Clayers=6 Cprecincts={256,256},{256,256},{128,128} Corder=RPCL Cuse_sop=yes Cuse_eph=yes Cblk={64,64} ORGgen_plt=yes ORGtparts=R Stiles={512,512}"""
// scalastyle:on
try {
executeCommand(imagemagickCommand).get
......@@ -97,7 +97,7 @@ object MediaTransformations extends Logging {
} finally {
Files.delete(intermediaryFile)
}
destFilePath
destFile
}
}
......@@ -20,75 +20,93 @@
package ch.memobase
import java.io.{ByteArrayOutputStream, IOException}
import java.io.ByteArrayOutputStream
import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}
trait ProcessOutcome
case class ProcessSuccess(id: String, fileType: MimeType, action: String) extends ProcessOutcome
case class ProcessSuccess(id: String, msg: String) extends ProcessOutcome
case class ProcessFailure(id: String, fileType: MimeType, action: String, ex: Throwable) extends ProcessOutcome
case class ProcessFailure(id: String, msg: String, ex: Throwable) extends ProcessOutcome
case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome
case class ProcessWarn(id: String, msg: String) extends ProcessOutcome
class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) {
private def errorHandler(fun: Try[_], objId: String, fileType: MimeType, action: String): ProcessOutcome = fun match {
case Success(_) => ProcessSuccess(objId, fileType, action)
case Failure(ex) => ProcessFailure(objId, fileType, action, ex)
}
def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
BinaryResourceMetadata.build(record.value(), externalBaseUrl) map {
case Success(rm) => fedoraClientWrapper.fetchBinaryResource(rm.filePath) match {
case Success(data) =>
createProcessResult(rm.id, rm.eventType, rm.mimeType, rm.instantiationType, data)
case Failure(ex) => ProcessFailure(record.key(), rm.mimeType, "", ex)
}
case Failure(ex) => ex match {
case e: NoLocalBinary => ProcessIgnore(record.key(), e.getMessage)
case e: NoDigitalObject => ProcessIgnore(record.key(), e.getMessage)
case e: UnmanageableMediaFileType => ProcessIgnore(record.key(), e.getMessage)
case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
}
BinaryResourceMetadata.build(record.value(), externalBaseUrl) flatMap {
case Success(binaryResource) =>
handleBinaryResource(binaryResource, record.key())
case Failure(ex) => List(ex match {
case e: NoLocalBinary => ProcessWarn(record.key(), e.getMessage)
case e: NoDigitalObject => ProcessWarn(record.key(), e.getMessage)
case e: UnmanageableMediaFileType => ProcessWarn(record.key(), e.getMessage)
case e: Exception => ProcessFailure(record.key(), e.getMessage, e)
})
}
}
//noinspection ScalaStyle
private def createProcessResult(id: String,
fedoraEvent: Event,
mimeType: MimeType,
instantiationType: Instantiation,
data: ByteArrayOutputStream) =
(id, fedoraEvent, mimeType, instantiationType, data) match {
case (id, Create | Update, af: AudioFile, _, data) =>
errorHandler(fileHandler.createAudioCopy(data, id, af), id, af, "Create/Update")
case (id, Delete, af: AudioFile, _, _) =>
errorHandler(fileHandler.deleteAudioCopy(id), id, af, "Delete")
case (id, Create | Update, vf: VideoFile, _, data) =>
errorHandler(fileHandler.createVideoCopy(data, id, vf), id, vf, "Create/Update")
case (id, Delete, vf: VideoFile, _, _) =>
errorHandler(fileHandler.deleteVideoCopy(id, vf), id, vf, "Delete")
case (id, Create | Update, i: ImageFile, DigitalObject, data) =>
errorHandler(fileHandler.createImageCopy(data, id, i), id, i, "Create/Update")
case (id, Create | Update, i: ImageFile, Thumbnail, data) =>
errorHandler(fileHandler.createImageCopy(data, id + "-poster", i), id, i, "Create/Update thumbnail")
case (id, Delete, i: ImageFile, DigitalObject, _) =>
errorHandler(fileHandler.deleteImageCopy(id), id, i, "Delete")
case (id, Delete, i: ImageFile, Thumbnail, _) =>
fileHandler.deleteImageCopy(id + "-poster") match {
case Success(_) => ProcessSuccess(id, i, "Delete thumbnail")
case Failure(ex) => ex match {
case _: IOException => ProcessIgnore(id, "Thumbnail does not exist")
case e => ProcessFailure(id, i, "Delete thumbnail", e)
}
}
case (id, event, ft, _, _) =>
ProcessFailure(id, ft, event.toString, new Exception)
private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
case Success(_) if binaryResource.eventType == Delete =>
deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.instantiationType)
case Success(data) =>
createResource(
binaryResource.id,
binaryResource.mimeType,
binaryResource.instantiationType,
data)
case Failure(ex) => List(ProcessFailure(recordKey, "Failed to retrieve binary from Fedora", ex))
}
}
private def deleteResource(id: String,
mimeType: MimeType,
instantiationType: Instantiation): List[ProcessOutcome] = {
(mimeType match {
case _: AudioFile =>
(fileHandler.deleteAudioCopy(id), id) ::
List((fileHandler.deleteImageCopy(id + "-intro"), id))
case mT: VideoFile =>
List((fileHandler.deleteVideoCopy(id, mT), id))
case _: ImageFile if instantiationType == DigitalObject =>
List((fileHandler.deleteImageCopy(id), id))
case _: ImageFile if instantiationType == Thumbnail =>
val posterId = id + "-poster"
List((fileHandler.deleteImageCopy(posterId), posterId))
}) flatMap(res => List(res._1 match {
case Success(true) => ProcessSuccess(res._2, s"Deletion of object $id with type $mimeType successful")
case Success(false) => ProcessSuccess(res._2, s"No deletion of object $id with type $mimeType because object does not exist")
case Failure(ex) => ProcessFailure(res._2, "Deletion of object $id with type $mimeType failed", ex)
}))
}
private def createResource(id: String,
mimeType: MimeType,
instantiationType: Instantiation,
data: ByteArrayOutputStream): List[ProcessOutcome] = {
(mimeType match {
case mT: AudioFile =>
(fileHandler.createAudioCopy(data, id, mT), id) ::
List((fileHandler.createAudioCopy(data, id + "-intro", mT), id))
case mT: VideoFile =>
List((fileHandler.createVideoCopy(data, id, mT), id))
case mT: ImageFile if instantiationType == DigitalObject =>
List((fileHandler.createImageCopy(data, id, mT), id))
case mT: ImageFile if instantiationType == Thumbnail =>
val posterId = id + "-poster"
List((fileHandler.createImageCopy(data, posterId, mT), posterId))
}) flatMap(res => List(res._1 match {
case Success(true) => ProcessSuccess(res._2, s"Updating of object $id with type $mimeType successful")
case Success(false) => ProcessSuccess(res._2, s"Creation of object $id with type $mimeType successful")
case Failure(ex) => ProcessFailure(res._2, "Creation of object $id with type $mimeType failed", ex)
}))
}
}
......@@ -50,7 +50,7 @@ class DisseminationCopyHandlerTest extends AnyFunSuite with BeforeAndAfter {
destFileName: String,
fileType: MimeType,
copyFun: (ByteArrayOutputStream, String, MimeType)
=> Try[Path]): Assertion = {
=> Try[Boolean]): Assertion = {
val file = Paths.get(pathToTmpDir, sourceFileName).toFile
val data = new ByteArrayOutputStream(file.length().toInt)
val buffer = new Array[Byte](1024)
......@@ -65,6 +65,28 @@ class DisseminationCopyHandlerTest extends AnyFunSuite with BeforeAndAfter {
assert(Paths.get(pathToTmpDir, destFileName).toFile.exists())
}
private def testAudioCopy(pathToTmpDir: String,
sourceFileName: String,
destFileName: String,
fileType: MimeType,
isSnippet: Boolean,
copyFun: (ByteArrayOutputStream, String, MimeType, Boolean)
=> Try[Boolean]): Assertion = {
val file = Paths.get(pathToTmpDir, sourceFileName).toFile
val data = new ByteArrayOutputStream(file.length().toInt)
val buffer = new Array[Byte](1024)
val in = new FileInputStream(file)
var len = in.read(buffer)
while (len != -1) {
data.write(buffer, 0, len)
len = in.read(buffer)
}
Paths.get(pathToTmpDir, destFileName).toFile.deleteOnExit()
copyFun(data, destFileName.split('.')(0), fileType, isSnippet)
assert(Paths.get(pathToTmpDir, destFileName).toFile.exists())
}
test("calling the deleteAudio function should delete temporary audio file") {
val f = fixture
val testFile = Files.createFile(Paths.get(f.resPath, "test.mp4"))
......@@ -74,14 +96,25 @@ class DisseminationCopyHandlerTest extends AnyFunSuite with BeforeAndAfter {
assert(!testSnippetFile.toFile.exists())
}
/**
* ATTENTION: Requires that ffmpeg is properly installed!
*/
test("calling the copyAudio function should create temporary snippet") {
runWithFFmpeg {
val f = fixture
testAudioCopy(f.resPath, "sample.mp3", "test-intro.mp3", Mp3File, isSnippet = true, f.fileHandler.createAudioCopy)
deleteFiles("src/test/resources/test-intro.mp3")
}
}
/**
* ATTENTION: Requires that ffmpeg is properly installed!
*/
test("calling the copyAudio function should create temporary file") {
runWithFFmpeg {
val f = fixture
testCopy(f.resPath, "sample.mp3", "test.mp4", Mp3File, f.fileHandler.createAudioCopy)
deleteFiles("src/test/resources/test.mp4", "src/test/resources/test-intro.mp3")
testAudioCopy(f.resPath, "sample.mp3", "test.mp4", Mp3File, isSnippet = false, f.fileHandler.createAudioCopy)
deleteFiles("src/test/resources/test.mp4")
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment