Verified Commit 4c8319cf authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

provide working poc

parent b429c48e
...@@ -21,16 +21,14 @@ cache: ...@@ -21,16 +21,14 @@ cache:
test: test:
stage: test stage: test
# TODO: Change to mbr tags: [mbr]
tags: [mbr4]
script: script:
- sbt test - sbt test
assembly: assembly:
stage: build stage: build
# TODO: Change to mbr tags: [mbr]
tags: [mbr4]
script: script:
- sbt assembly - sbt assembly
artifacts: artifacts:
......
...@@ -25,8 +25,8 @@ lazy val root = (project in file(".")) ...@@ -25,8 +25,8 @@ lazy val root = (project in file("."))
log4jCore, log4jCore,
log4jSlf4j, log4jSlf4j,
log4jScala, log4jScala,
mariadbJdbcDriver,
memobaseServiceUtils, memobaseServiceUtils,
requests,
scalatic, scalatic,
sprayJson, sprayJson,
scalaTest % Test) scalaTest % Test)
......
...@@ -28,8 +28,8 @@ object Dependencies { ...@@ -28,8 +28,8 @@ object Dependencies {
lazy val log4jCore = "org.apache.logging.log4j" % "log4j-core" % log4jV lazy val log4jCore = "org.apache.logging.log4j" % "log4j-core" % log4jV
lazy val log4jScala = "org.apache.logging.log4j" %% "log4j-api-scala" % "11.0" lazy val log4jScala = "org.apache.logging.log4j" %% "log4j-api-scala" % "11.0"
lazy val log4jSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jV lazy val log4jSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jV
lazy val mariadbJdbcDriver = "org.mariadb.jdbc" % "mariadb-java-client" % "2.6.0"
lazy val memobaseServiceUtils = "org.memobase" % "memobase-service-utilities" % "1.2.1" lazy val memobaseServiceUtils = "org.memobase" % "memobase-service-utilities" % "1.2.1"
lazy val requests = "com.lihaoyi" %% "requests" % "0.5.1"
lazy val scalatic = "org.scalactic" %% "scalactic" % scalatestV lazy val scalatic = "org.scalactic" %% "scalactic" % scalatestV
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalatestV lazy val scalaTest = "org.scalatest" %% "scalatest" % scalatestV
lazy val sprayJson = "io.spray" %% "spray-json" % "1.3.5" lazy val sprayJson = "io.spray" %% "spray-json" % "1.3.5"
......
id: ${JOB_ID:?system} id: ${JOB_ID:?system}
app: app:
audioSinkDir: ${} audioSinkDir: ${AUDIO_SINK_DIR:?system}
imageSinkDir: ${} imageSinkDir: ${IMAGE_SINK_DIR:?system}
videoSinkDir: ${} videoSinkDir: ${VIDEO_SINK_DIR:?system}
mediaserverDBHost: ${} fedoraHost: ${FEDORA_HOST:?system}
mediaserverDBPort: ${}
mediaserverDBUser: ${}
mediaserverDBPassword: ${}
kafka: kafka:
consumer: consumer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system} bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
{
"timestamp": "20200602T083300+02:00",
"object_id": 3242123,
"event": "update"
}
\ No newline at end of file
/*
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.io.IOException
import scala.util.Try
trait MediaFileType
trait AudioFile extends MediaFileType
trait ImageFile extends MediaFileType
trait VideoFile extends MediaFileType
case class OtherFile(fileType: Seq[String]) extends MediaFileType
case object UnknownFile extends MediaFileType
case object Mp3File extends AudioFile
case object JpegFile extends ImageFile
case object VideoMpeg4File extends VideoFile
case class FileWithMetadata(data: Array[Byte], fileType: MediaFileType)
trait ApiHandler {
def getFile(uri: String): Try[FileWithMetadata]
}
class FedoraHandler extends ApiHandler {
override def getFile(uri: String): Try[FileWithMetadata] = {
Try {
val res = requests.get(uri)
if (res.is2xx || res.is3xx) {
val mediaFileType = res.headers.get("Content-Type") match {
case Some(mT) if mT.contains("audio/mpeg") => Mp3File
case Some(mT) if mT.contains("image/jpg") => JpegFile
case Some(mT) if mT.contains("video/mp4") => VideoMpeg4File
case Some(mT) => OtherFile(mT)
case None => UnknownFile
}
FileWithMetadata(res.data.array, mediaFileType)
} else {
throw new IOException("")
}
}
}
}
...@@ -22,9 +22,7 @@ import java.time.Duration ...@@ -22,9 +22,7 @@ import java.time.Duration
import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging import org.apache.logging.log4j.scala.Logging
import org.memobase.models.KafkaMessageLoader
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import org.memobase.utils.DBHandler
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -34,11 +32,7 @@ object App extends scala.App with Logging { ...@@ -34,11 +32,7 @@ object App extends scala.App with Logging {
"audioSinkDir", "audioSinkDir",
"imageSinkDir", "imageSinkDir",
"videoSinkDir", "videoSinkDir",
"mediaserverDBHost", "fedoraHost"
"mediaserverDBPort",
"mediaserverDBUser",
"mediaserverDBPassword",
"mediaserverDBTable"
).asJava, ).asJava,
"app.yml", "app.yml",
false, false,
...@@ -46,13 +40,12 @@ object App extends scala.App with Logging { ...@@ -46,13 +40,12 @@ object App extends scala.App with Logging {
true, true,
false) false)
val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings) val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings)
val dbHandle = DBHandler( val fileHandler = new FileHandlerImpl(
settings.getAppSettings.getProperty("mediaserverDBHost"), settings.getAppSettings.getProperty("audioSinkDir"),
settings.getAppSettings.getProperty("mediaserverDBPort").asInstanceOf[Int], settings.getAppSettings.getProperty("imageSinkDir"),
settings.getAppSettings.getProperty("mediaserverDBUser"), settings.getAppSettings.getProperty("videoSinkDir"))
settings.getAppSettings.getProperty("mediaserverDBPassword"), val fedoraHandler = new FedoraHandler
settings.getAppSettings.getProperty("mediaserverDBTable") val recordProcessor = new RecordProcessor(fileHandler, fedoraHandler)
)
try { try {
logger.debug(s"Subscribing to topic ${settings.getInputTopic}") logger.debug(s"Subscribing to topic ${settings.getInputTopic}")
...@@ -60,14 +53,21 @@ object App extends scala.App with Logging { ...@@ -60,14 +53,21 @@ object App extends scala.App with Logging {
while (true) { while (true) {
val records = consumer.poll(Duration.ofMillis(100)).asScala val records = consumer.poll(Duration.ofMillis(100)).asScala
for (record <- records) { for (record <- records) {
val msg = KafkaMessageLoader.load(record.value()) recordProcessor.process(record) match {
msg.event match { case ProcessSuccess(id, fT, a) if a == "Delete" =>
case "new" || "update" => logger.debug(s"Deleting of object with type $fT and $id successful")
case ProcessSuccess(id, fT, _) =>
logger.debug(s"Copying of object with type $fT and $id successful")
case ProcessFailure(id, fT, a, ex) if a == "Delete" =>
logger.error(s"Deleting of object with type $fT and $id failed: ${ex.getMessage}")
case ProcessFailure(id, fT, _, ex) =>
logger.error(s"Copying of object with type $fT and $id failed: ${ex.getMessage}")
case ProcessIgnore(id, reason, warn) if warn =>
logger.warn(s"Ignoring object with id $id: $reason")
case ProcessIgnore(id, reason, _) =>
logger.info(s"Ignoring object with id $id: $reason")
case "delete" =>
} }
// TODO: Implement real logic
println(record.key())
} }
} }
} catch { } catch {
...@@ -75,6 +75,5 @@ object App extends scala.App with Logging { ...@@ -75,6 +75,5 @@ object App extends scala.App with Logging {
} finally { } finally {
logger.info("Shutting down application") logger.info("Shutting down application")
consumer.close() consumer.close()
dbHandle.close()
} }
} }
/*
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.nio.file.{Files, Path, Paths}
import scala.util.Try
trait FileHandler {
def copyAudio(data: Array[Byte], destId: String): Try[Path]
def copyImage(data: Array[Byte], destId: String): Try[Path]
def copyVideo(data: Array[Byte], destId: String): Try[Path]
def deleteAudio(destId: String): Try[Boolean]
def deleteImage(destId: String): Try[Boolean]
def deleteVideo(destId: String): Try[Boolean]
}
class FileHandlerImpl(audioDestPath: String, imageDestPath: String, videoDestPath: String) extends FileHandler {
private def copy(data: Array[Byte], destFile: Path): Try[Path] = {
Try {
Files.write(destFile, data)
}
}
/**
* Creates destination filename
*
* @param id id (signature) of record to which the media resource belongs to
* @param collectionId id of collection to which the media resource belongs to
* @param institutionId id of institution to which the media resource belongs to
* @return generated filename
*/
private def createDestFilename(id: String, collectionId: String, institutionId: String): String = {
// TODO: Create real filename producing function
s"$institutionId-$collectionId-$id"
}
override def copyAudio(data: Array[Byte], destId: String): Try[Path] = {
Try {
val tempFilePath = Files.createTempFile("media-", null)
val destFile = Paths.get(audioDestPath, destId + ".mp4")
Files.write(tempFilePath, data)
Transformations.mp3ToMp4(tempFilePath.toString, destFile.toString).get
Files.delete(tempFilePath)
destFile
}
}
override def copyImage(data: Array[Byte], destId: String): Try[Path] = {
// TODO: Implement transcoding
val destFile = Paths.get(imageDestPath, destId + ".jp2")
copy(data, destFile)
}
override def copyVideo(data: Array[Byte], destId: String): Try[Path] = {
val destFile = Paths.get(videoDestPath, destId + ".mp4")
copy(data, destFile)
}
override def deleteAudio(destId: String): Try[Boolean] =
Try {
Paths.get(audioDestPath, destId + ".mp4").toFile.delete()
}
override def deleteImage(destId: String): Try[Boolean] =
Try {
Paths.get(imageDestPath, destId + ".jp2").toFile.delete()
}
override def deleteVideo(destId: String): Try[Boolean] =
Try {
Paths.get(videoDestPath, destId + ".mp4").toFile.delete()
}
}
...@@ -16,21 +16,30 @@ ...@@ -16,21 +16,30 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase.models package org.memobase
import spray.json._ import spray.json._
case class KafkaMessage(id: String,
event: String, case class KafkaMessage(objectId: String,
timestamp: String) objectPath: String,
objectTypes: Seq[String],
eventId: String,
eventType: String,
eventTimestamp: String) {
// TODO: Implement real check
def isBinaryObject: Boolean = objectTypes.contains("https://binaryObject")
}
//noinspection TypeAnnotation //noinspection TypeAnnotation
object KafkaMessageProtocol extends DefaultJsonProtocol { object KafkaMessageProtocol extends DefaultJsonProtocol {
implicit val kafkaMessageFormat = jsonFormat3(KafkaMessage) implicit val kafkaMessageFormat = jsonFormat6(KafkaMessage)
} }
object KafkaMessageLoader { object KafkaMessageLoader {
import KafkaMessageProtocol._ import KafkaMessageProtocol._
def load(msg: String): KafkaMessage = { def load(msg: String): KafkaMessage = {
......
/*
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.util.{Failure, Success, Try}
trait ProcessOutcome
case class ProcessSuccess(id: String, fileType: MediaFileType, action: String) extends ProcessOutcome
case class ProcessFailure(id: String, fileType: MediaFileType, action: String, ex: Throwable) extends ProcessOutcome
case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome
trait ProcessError
case object Process
class RecordProcessor(fh: FileHandler, ah: ApiHandler) {
private def errorHandler(fun: Try[_], objId: String, fileType: MediaFileType, action: String): ProcessOutcome = fun match {
case Success(_) => ProcessSuccess(objId, fileType, action)
case Failure(ex) => ProcessFailure(objId, fileType, action, ex)
}
def process(record: ConsumerRecord[String, String]): ProcessOutcome = {
val msg = KafkaMessageLoader.load(record.value())
if (msg.isBinaryObject) {
ah.getFile(msg.objectPath) match {
case Success(res) =>
res.fileType match {
case fT: AudioFile =>
msg.eventType match {
case action@("Create" | "Update") =>
errorHandler(fh.copyAudio(res.data.array, msg.objectId), msg.objectId, fT, action)
case action@"Delete" =>
errorHandler(fh.deleteAudio(msg.objectId), msg.objectId, fT, action)
}
case fT: ImageFile =>
msg.eventType match {
case action@("Create" | "Update") =>
errorHandler(fh.copyImage(res.data.array, msg.objectId), msg.objectId, fT, action)
case action@"Delete" =>
errorHandler(fh.deleteImage(msg.objectId), msg.objectId, fT, action)
}
case fT: VideoFile =>
msg.eventType match {
case action@("Create" | "Update") =>
errorHandler(fh.copyVideo(res.data.array, msg.objectId), msg.objectId, fT, action)
case action@"Delete" =>
errorHandler(fh.deleteVideo(msg.objectId), msg.objectId, fT, action)
}
case OtherFile(fT) =>
ProcessIgnore(msg.objectId, s"files of type $fT cannot be handled", warn = true)
case UnknownFile =>
ProcessIgnore(msg.objectId, "no content type defined", warn = true)
}
case Failure(ex) => ProcessFailure(msg.objectId, UnknownFile, msg.eventType, ex)
}
// case et => logger.warn(s"event type $et is unknown: ignoring message")
} else {
ProcessIgnore(msg.objectId, "is no binary object")
}
}
}
...@@ -16,29 +16,34 @@ ...@@ -16,29 +16,34 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase.utils package org.memobase
import java.sql.{Connection, DriverManager} import java.io.IOException
import org.apache.logging.log4j.scala.Logging import org.apache.logging.log4j.scala.Logging
class DBHandler(connection: Connection, table: String) { import scala.util.Try
def insert(sig: String, uri: String): Boolean = { object Transformations extends Logging {
val st = connection.createStatement()
val res = st.executeQuery(s"UPDATE $table SET uri=$uri WHERE sig=$sig")
}
def close(): Unit = { import sys.process._
connection.close()
}
} private def wrapCommand(command: String): Try[Int] = Try {
val stderr = StringBuilder.newBuilder
val errorCode = command ! ProcessLogger(logger.debug(_), stderr append _)
if (errorCode > 0) {
throw new IOException(s"application ${command.split(' ')(0)} exited with code $errorCode: ${stderr.toString()}")
} else {
errorCode
}
}
object DBHandler extends Logging { def mp3ToMp4(sourceFile: String, destFile: String): Try[String] = {
def apply(host: String, port: Int, user: String, password: String, table: String): DBHandler = { val externalCommand = s"ffmpeg -i $sourceFile -acodec copy -loglevel warning -hide_banner -y -movflags faststart $destFile"
logger.info(s"Connecting to mediaserver database on $host:$port") Try {
val connection = DriverManager.getConnection(s"jdbc:mariadb://$host:$port/DB?user=$user&password=$password"); wrapCommand(externalCommand).get
new DBHandler(connection, table) destFile
}
} }
} }
...@@ -21,22 +21,15 @@ package org.memobase ...@@ -21,22 +21,15 @@ package org.memobase
import org.apache.kafka.clients.producer.{MockProducer, ProducerRecord} import org.apache.kafka.clients.producer.{MockProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.scalatest.funsuite.AnyFunSuite import org.scalatest.funsuite.AnyFunSuite
import sys.process._
class AppTest extends AnyFunSuite { class AppTest extends AnyFunSuite {
test("An empty Set should be empty") {
assert(Set.empty.isEmpty)
}
test("A mock producer should produce mock data") {
val mp = new MockProducer[String, String]() test("a mock producer should produce mock data") {
val mp = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val pr = new ProducerRecord("test", "gugus", "data") val pr = new ProducerRecord("test", "gugus", "data")
val fut = mp.send(pr) mp.send(pr)
assert (mp.commitCount() == 1) assert(mp.history().size() == 1)
} }
test("Blabla") {
val test = Seq("echo", "-n", "hello").!!.trim
assert(test == "hello")
}