Unverified Commit 30b8ba5f authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

big refactoring...



Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 849191c9
Pipeline #11771 failed with stages
in 3 minutes and 12 seconds
# Fedora Media File Extractor
# Media Converter
The Fedora Media File Extractor is an intermediary service which takes care of media file postprocessing
The __Media Converter__ is an intermediary service which takes care of media file postprocessing
upon receiving notifications on changes in the Fedora repository. More precisely, the service
1. extracts the path to the media file from the Fedora API based on the received ID
2. transcodes this file if required (e.g. jpg -> jp2),
......
......@@ -12,7 +12,7 @@ ThisBuild / git.gitTagToVersionNumber := { tag: String =>
lazy val root = (project in file("."))
.enablePlugins(GitVersioning)
.settings(
name := "fedora-media-extractor",
name := "Media Converter",
assemblyJarName in assembly := "app.jar",
test in assembly := {},
assemblyMergeStrategy in assembly := {
......@@ -28,15 +28,15 @@ lazy val root = (project in file("."))
"Memobase Utils" at "https://dl.bintray.com/memoriav/memobase"
),
libraryDependencies ++= Seq(
fedoraClient exclude("org.slf4j", "slf4j-simple"),
kafkaClients,
log4jApi,
log4jCore,
log4jSlf4j,
log4jScala,
memobaseServiceUtils,
requests,
scalatic,
sprayJson,
uPickle,
scalaTest % Test)
)
apiVersion: apps/v1
kind: Deployment
metadata:
name: fedora-media-extractor-deployment
name: media-converter-deployment
namespace: memobase
labels:
app: fedora-media-extractor-app
app: media-converter-app
spec:
selector:
matchLabels:
app: fedora-media-extractor-app
app: media-converter-app
replicas: 1
template:
metadata:
labels:
app: fedora-media-extractor-app
app: media-converter-app
tier: web
spec:
containers:
- name: fedora-media-extractor-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/postprocessing/fedora-media-extractor:latest
imagePullPolicy: Always
volumeMounts:
- name: av-volume
mountPath: /data
env:
- name: JOB_ID
value: fedora-media-extractor
- name: AUDIO_SINK_DIR
value: /data
- name: IMAGE_SINK_DIR
value: /data
- name: VIDEO_SINK_DIR
value: /data
- name: FEDORA_HOST
value: mb-fed1.memobase.unibas.ch:8080/fcrepo
- name: KAFKA_BOOTSTRAP_SERVERS
value: mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
- name: APPLICATION_ID
value: fedora-media-extractor
- name: TOPIC_IN
value: metadata-all
- name: media-converter-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/postprocessing/media-converter:latest
imagePullPolicy: Always
volumeMounts:
- name: av-volume
mountPath: /data
env:
- name: JOB_ID
value: media-converter
- name: AUDIO_SINK_DIR
value: /data
- name: IMAGE_SINK_DIR
value: /data
- name: VIDEO_SINK_DIR
value: /data
- name: INTERNAL_BASE_URL
value: "http://mb-fd1.memobase.unibas.ch:8080/fcrepo/rest/"
- name: EXTERNAL_BASE_URL
value: "https://memobase.ch/"
- name: FEDORA_USER
valueFrom:
secretKeyRef:
name: fedora-admin-credentials
key: FEDORA_USER
- name: FEDORA_PASSWORD
valueFrom:
secretKeyRef:
name: fedora-admin-credentials
key: FEDORA_PASSWORD
- name: KAFKA_BOOTSTRAP_SERVERS
value: mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
- name: APPLICATION_ID
value: media-converter
- name: TOPIC_IN
value: metadata-all
restartPolicy: Always
volumes:
- name: av-volume
......
......@@ -23,14 +23,15 @@ object Dependencies {
lazy val log4jV = "2.11.2"
lazy val scalatestV = "3.1.2"
lazy val fedoraClient = "org.memobase" % "fedora-client" % "0.3.0"
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaV
lazy val log4jApi = "org.apache.logging.log4j" % "log4j-api" % log4jV
lazy val log4jCore = "org.apache.logging.log4j" % "log4j-core" % log4jV
lazy val log4jScala = "org.apache.logging.log4j" %% "log4j-api-scala" % "11.0"
lazy val log4jSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jV
lazy val memobaseServiceUtils = "org.memobase" % "memobase-service-utilities" % "1.4.1"
lazy val requests = "com.lihaoyi" %% "requests" % "0.5.1"
lazy val scalatic = "org.scalactic" %% "scalactic" % scalatestV
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalatestV
lazy val sprayJson = "io.spray" %% "spray-json" % "1.3.5"
lazy val sprayJson = "io.spray" %% "spray-json" % "1.3.5"
lazy val uPickle = "com.lihaoyi" %% "upickle" % "0.9.5"
}
......@@ -2,7 +2,10 @@ app:
audioSinkDir: ${AUDIO_SINK_DIR:?system}
imageSinkDir: ${IMAGE_SINK_DIR:?system}
videoSinkDir: ${VIDEO_SINK_DIR:?system}
fedoraHost: ${FEDORA_HOST:?system}
internalBaseUrl: ${INTERNAL_BASE_URL:?system}
externalBaseUrl: ${EXTERNAL_BASE_URL:?system}
fedoraUser: ${FEDORA_USER:?system}
fedoraPassword: ${FEDORA_PASSWORD:?system}
kafka:
consumer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -16,7 +16,7 @@
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
-->
<Configuration status="info" name="skeleton-app" packages="">
<Configuration status="info" name="media-converter" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5level] [%c{1}] %m%n"/>
......
......@@ -32,7 +32,10 @@ object App extends scala.App with Logging {
"audioSinkDir",
"imageSinkDir",
"videoSinkDir",
"fedoraHost"
"internalBaseUrl",
"externalBaseUrl",
"fedoraUser",
"fedoraPassword"
).asJava,
"app.yml",
false,
......@@ -40,12 +43,17 @@ object App extends scala.App with Logging {
true,
false)
val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings)
val fileHandler = new FileHandlerImpl(
val fileHandler = new DisseminationCopyHandler(
settings.getAppSettings.getProperty("audioSinkDir"),
settings.getAppSettings.getProperty("imageSinkDir"),
settings.getAppSettings.getProperty("videoSinkDir"))
val fedoraHandler = new FedoraHandler
val recordProcessor = new RecordProcessor(fileHandler, fedoraHandler)
val fCWrapper = FedoraClientWrapper(
settings.getAppSettings.getProperty("interalBaseUrl"),
settings.getAppSettings.getProperty("externalBaseUrl"),
settings.getAppSettings.getProperty("fedoraUser"),
settings.getAppSettings.getProperty("fedoraPassword")
)
val recordProcessor = new RecordProcessor(fileHandler, fCWrapper, settings.getAppSettings.getProperty("externalBaseUrl"))
try {
logger.debug(s"Subscribing to topic ${settings.getInputTopic}")
......@@ -53,19 +61,20 @@ object App extends scala.App with Logging {
while (true) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) {
// TODO: Filter for objectTypes: Only records, not institutions, recordSets
recordProcessor.process(record) match {
case ProcessSuccess(id, fT, a) if a == "Delete" =>
logger.debug(s"Deleting of object with type $fT and $id successful")
case ProcessSuccess(id, fT, _) =>
logger.debug(s"Copying of object with type $fT and $id successful")
logger.debug(s"Deleting of object $id with type $fT successful")
case ProcessSuccess(id, fT, _) =>
logger.debug(s"Copying of object $id with type $fT successful")
case ProcessFailure(id, fT, a, ex) if a == "Delete" =>
logger.error(s"Deleting of object with type $fT and $id failed: ${ex.getMessage}")
logger.error(s"Deleting of object $id with type $fT failed: ${ex.getMessage}")
case ProcessFailure(id, fT, _, ex) =>
logger.error(s"Copying of object with type $fT and $id failed: ${ex.getMessage}")
logger.error(s"Copying of object $id with type $fT failed: ${ex.getMessage}")
case ProcessIgnore(id, reason, warn) if warn =>
logger.warn(s"Ignoring object with id $id: $reason")
logger.warn(s"Ignoring object $id: $reason")
case ProcessIgnore(id, reason, _) =>
logger.info(s"Ignoring object with id $id: $reason")
logger.info(s"Ignoring object $id: $reason")
}
}
......
......@@ -18,29 +18,33 @@
package ch.memobase
import java.io.{ByteArrayOutputStream, FileOutputStream, IOException}
import java.nio.file.{Files, Path, Paths}
import scala.util.Try
import scala.util.{Failure, Success, Try}
trait FileHandler {
def copyAudio(data: Array[Byte], destId: String): Try[Path]
def copyImage(data: Array[Byte], destId: String): Try[Path]
def copyVideo(data: Array[Byte], destId: String): Try[Path]
def deleteAudio(destId: String): Try[Boolean]
def deleteImage(destId: String): Try[Boolean]
def deleteVideo(destId: String): Try[Boolean]
}
class FileHandlerImpl(audioDestPath: String, imageDestPath: String, videoDestPath: String) extends FileHandler {
private def copy(data: Array[Byte], destFile: Path): Try[Path] = {
Try {
Files.write(destFile, data)
/**
* Manages dissemination copies of media files
*
* @param audioDestPath Path to folder containing audio dissemination copies
* @param imageDestPath Path to folder containing image dissemination copies
* @param videoDestPath Path to folder containing video dissemination copies
*/
class DisseminationCopyHandler(audioDestPath: String, imageDestPath: String, videoDestPath: String) {
private def writeData(data: ByteArrayOutputStream, destFile: Path): Try[Path] = {
Try(new FileOutputStream(destFile.toFile)) match {
case Success(fos) =>
try {
data.writeTo(fos)
Success(destFile)
} catch {
case e: IOException => Failure(e)
} finally {
fos.close()
data.close()
}
case Failure(ex) => Failure(ex)
}
}
......@@ -58,39 +62,78 @@ class FileHandlerImpl(audioDestPath: String, imageDestPath: String, videoDestPat
}
override def copyAudio(data: Array[Byte], destId: String): Try[Path] = {
/**
* Creates dissemination copy of audio file
*
* @param data binary data as [[java.io.ByteArrayOutputStream]] instance
* @param destId Filename of dissemination copy without extension
* @return
*/
def createAudioCopy(data: ByteArrayOutputStream, destId: String): Try[Path] = {
Try {
val tempFilePath = Files.createTempFile("media-", null)
val destFile = Paths.get(audioDestPath, destId + ".mp4")
Files.write(tempFilePath, data)
writeData(data, tempFilePath)
Transformations.mp3ToMp4(tempFilePath.toString, destFile.toString).get
Files.delete(tempFilePath)
destFile
}
}
override def copyImage(data: Array[Byte], destId: String): Try[Path] = {
/**
* Creates dissemination copy of image file
*
* @param data binary data as [[java.io.ByteArrayOutputStream]] instance
* @param destId Filename of dissemination copy without extension
* @return
*/
def createImageCopy(data: ByteArrayOutputStream, destId: String): Try[Path] = {
// TODO: Implement transcoding
val destFile = Paths.get(imageDestPath, destId + ".jp2")
copy(data, destFile)
writeData(data, destFile)
}
override def copyVideo(data: Array[Byte], destId: String): Try[Path] = {
/**
* Create dissemination copy of video file
*
* @param data binary data as [[java.io.ByteArrayOutputStream]] instance
* @param destId Filename of dissemination copy without extension
* @return
*/
def createVideoCopy(data: ByteArrayOutputStream, destId: String): Try[Path] = {
val destFile = Paths.get(videoDestPath, destId + ".mp4")
copy(data, destFile)
writeData(data, destFile)
}
override def deleteAudio(destId: String): Try[Boolean] =
/**
* Deletes dissemination copy of audio file
*
* @param destId Filename of dissemination copy without extension
* @return
*/
def deleteAudioCopy(destId: String): Try[Boolean] =
Try {
Paths.get(audioDestPath, destId + ".mp4").toFile.delete()
}
override def deleteImage(destId: String): Try[Boolean] =
/**
* Deletes dissemination copy of image file
*
* @param destId Filename of dissemination copy without extension
* @return
*/
def deleteImageCopy(destId: String): Try[Boolean] =
Try {
Paths.get(imageDestPath, destId + ".jp2").toFile.delete()
}
override def deleteVideo(destId: String): Try[Boolean] =
/**
* Deletes dissemination copy of video file
*
* @param destId Filename of dissemination copy without extension
* @return
*/
def deleteVideoCopy(destId: String): Try[Boolean] =
Try {
Paths.get(videoDestPath, destId + ".mp4").toFile.delete()
}
......
/*
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import java.io.ByteArrayOutputStream
import java.net.URI
import ch.memobase.models._
import org.memobase.fedora.{BinaryResource, FedoraClient, FedoraClientImpl}
import scala.util.{Failure, Success, Try}
/**
* Acts as a thin wrapper around a [[org.memobase.fedora.FedoraClient]] instance
*
* @param fc Wrapped [[org.memobase.fedora.FedoraClient]] instance
*/
class FedoraClientWrapper(fc: FedoraClient) {
import FedoraClientWrapper.{copyDataAndCloseResource, getMediaFileType}
/**
* Fetches a binary resource from Fedora and returns, if successful, a [[FileWithMetadata]] instance
*
* @param url The URL to the resource
* @return
*/
def fetchBinaryResource(url: String): Try[FileWithMetadata] = {
for {
u <- Try(new URI(url))
resource <- Try(fc.fetchBinaryResource(u))
mediaFileType <- getMediaFileType(resource.getMimetype)
outputStream <- copyDataAndCloseResource(resource)
} yield models.FileWithMetadata(outputStream, mediaFileType)
}
}
object FedoraClientWrapper {
/**
* Creates a [[FedoraClientWrapper]] instance
*
* @param interalBaseUrl Base URL used internally by Fedora
* @param externalBaseUrl Base URL used outside of Fedora
* @param fedoraUsername Username used to query Fedora
* @param fedoraPassword Password used to query Fedora
* @return
*/
def apply(interalBaseUrl: String,
externalBaseUrl: String,
fedoraUsername: String,
fedoraPassword: String): FedoraClientWrapper = {
val builder = FedoraClientImpl.builder()
builder.urls(interalBaseUrl, externalBaseUrl)
builder.credentials(fedoraUsername, fedoraPassword)
val fc = builder.build()
new FedoraClientWrapper(fc)
}
private def getMediaFileType(mimeType: String): Try[MediaFileType] = mimeType match {
case "audio/mpeg" => Success(Mp3File)
// case "audio/ogg" => OggFile
case "image/jpg" => Success(JpegFile)
case "video/mp4" => Success(VideoMpeg4File)
// TODO: Other filetypes...
case mT => Failure(new UnmanageableMediaFileType(s"Can't handle files of type $mT"))
}
private def copyDataAndCloseResource(binaryResource: BinaryResource): Try[ByteArrayOutputStream] = {
val outputStream: Try[ByteArrayOutputStream] = Try {
val oS = new ByteArrayOutputStream()
val data = new Array[Byte](2097152)
Iterator.continually(binaryResource.getData.read(data))
.takeWhile(-1 !=)
.foreach(oS.write)
oS
}
binaryResource.close()
outputStream
}
}
......@@ -19,6 +19,7 @@
package ch.memobase
import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.util.{Failure, Success, Try}
......@@ -31,12 +32,8 @@ case class ProcessFailure(id: String, fileType: MediaFileType, action: String, e
case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome
trait ProcessError
case object Process
class RecordProcessor(fh: FileHandler, ah: ApiHandler) {
class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) {
private def errorHandler(fun: Try[_], objId: String, fileType: MediaFileType, action: String): ProcessOutcome = fun match {
case Success(_) => ProcessSuccess(objId, fileType, action)
......@@ -44,42 +41,26 @@ class RecordProcessor(fh: FileHandler, ah: ApiHandler) {
}
def process(record: ConsumerRecord[String, String]): ProcessOutcome = {
val msg = KafkaMessageLoader.load(record.value())
if (msg.isBinaryObject) {
ah.getFile(msg.objectPath) match {
case Success(res) =>
res.fileType match {
case fT: AudioFile =>
msg.eventType match {
case action@("Create" | "Update") =>
errorHandler(fh.copyAudio(res.data.array, msg.objectId), msg.objectId, fT, action)
case action@"Delete" =>
errorHandler(fh.deleteAudio(msg.objectId), msg.objectId, fT, action)
}
case fT: ImageFile =>
msg.eventType match {
case action@("Create" | "Update") =>
errorHandler(fh.copyImage(res.data.array, msg.objectId), msg.objectId, fT, action)
case action@"Delete" =>
errorHandler(fh.deleteImage(msg.objectId), msg.objectId, fT, action)
}
case fT: VideoFile =>
msg.eventType match {
case action@("Create" | "Update") =>
errorHandler(fh.copyVideo(res.data.array, msg.objectId), msg.objectId, fT, action)
case action@"Delete" =>
errorHandler(fh.deleteVideo(msg.objectId), msg.objectId, fT, action)
}
case OtherFile(fT) =>
ProcessIgnore(msg.objectId, s"files of type $fT cannot be handled", warn = true)
case UnknownFile =>
ProcessIgnore(msg.objectId, "no content type defined", warn = true)
}
case Failure(ex) => ProcessFailure(msg.objectId, UnknownFile, msg.eventType, ex)
}
// case et => logger.warn(s"event type $et is unknown: ignoring message")
} else {
ProcessIgnore(msg.objectId, "is no binary object")
(for {
kafkaMsg <- BinaryResourceMetadata.build(record.value(), externalBaseUrl)
fileWithMetadata <- fedoraClientWrapper.fetchBinaryResource(kafkaMsg.filePath)
} yield (kafkaMsg.id, kafkaMsg.eventType, fileWithMetadata.fileType, fileWithMetadata.data) match {
case (id, Create | Update, af: AudioFileType, data) =>
errorHandler(fileHandler.createAudioCopy(data, id), id, af, "Create/Update")
case (id, Delete, af: AudioFileType, _) =>
errorHandler(fileHandler.deleteAudioCopy(id), id, af, "Delete")
case (id, Create | Update, vf: VideoFileType, data) =>
errorHandler(fileHandler.createImageCopy(data, id), id, vf, "Create/Update")
case (id, Delete, vf: VideoFileType, _) =>
errorHandler(fileHandler.deleteImageCopy(id), id, vf, "Delete")
case (id, Create | Update, i: ImageFileType, data) =>
errorHandler(fileHandler.createVideoCopy(data, id), id, i, "Create/Update")
case (id, Delete, i: ImageFileType, _) =>
errorHandler(fileHandler.deleteVideoCopy(id), id, i, "Delete")
case (id, event, ft, _) =>
ProcessFailure(id, ft, event.toString, new Exception)
}).recover {
case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
}
}
}.get
}
......@@ -24,6 +24,9 @@ import org.apache.logging.log4j.scala.Logging
import scala.util.Try
/**
* Contains functions used to transform specific media files
*/
object Transformations extends Logging {
import sys.process._
......@@ -38,11 +41,18 @@ object Transformations extends Logging {
}
}
def mp3ToMp4(sourceFile: String, destFile: String): Try[String] = {
val externalCommand = s"ffmpeg -i $sourceFile -acodec copy -loglevel warning -hide_banner -y -movflags faststart $destFile"
/**
* Repacks MP3 files in a MP4 container and adds a moov atom at the beginning of the file
*
* @param sourceFilePath Path to the temporary file
* @param destFilePath Path to the final file
* @return
*/
def mp3ToMp4(sourceFilePath: String, destFilePath: String): Try[String] = {
val externalCommand = s"ffmpeg -i $sourceFilePath -acodec copy -loglevel warning -hide_banner -y -movflags faststart $destFilePath"
Try {
wrapCommand(externalCommand).get
destFile
destFilePath
}
}
......