Unverified Commit 5c4e0a7b authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

first commit

parents
Pipeline #16867 passed with stages
in 10 minutes and 7 seconds
.bloop/
.idea/
.metals/
target/
stages:
- test
- build
- publish
variables:
DOCKER_TLS_CERTDIR: ""
include:
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'helm-chart/helm-chart.yml'
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'sbt-build/sbt-build.yml'
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'docker-image/docker-image.yml'
\ No newline at end of file
version = "2.6.4"
FROM openjdk:8-jre-alpine
RUN apk add --no-cache ffmpeg
ADD target/scala-2.12/app.jar /app/app.jar
CMD java -jar /app/app.jar
This diff is collapsed.
# Media Metadata Preprocessor
The Media Metadata Preprocessor has two tasks.
First, it acts as a filter for
separating records which can't be enriched from those which can. This helps to
accelerate the media metadata enrichment by not "congesting" the enrichment
pipeline with unsuitable records.
Second, it adds the `ebucore:isDistributedOn` property. It can have the
values `audio`, `image` and `video` for locally held or at least remotely
accessible media files and `youtube`, `vimeo`, `srfaudio`, `srfvideo` and
eventually `zem` for media files which are delivered by an external player
(and therefore are not directly accessible for further analysing).
\ No newline at end of file
import Dependencies._
ThisBuild / scalaVersion := "2.12.11"
ThisBuild / organization := "ch.memobase"
ThisBuild / organizationName := "Memoriav"
ThisBuild / git.gitTagToVersionNumber := { tag: String =>
if (tag matches "[0-9]+\\..*") {
Some(tag)
} else {
None
}
}
lazy val root = (project in file("."))
.enablePlugins(GitVersioning)
.settings(
name := "Media Metadata Preprocessor",
assemblyJarName in assembly := "app.jar",
test in assembly := {},
assemblyMergeStrategy in assembly := {
case "log4j.properties" => MergeStrategy.first
case "log4j2.xml" => MergeStrategy.first
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
},
mainClass in assembly := Some("ch.memobase.App"),
resolvers ++= Seq(
"Memobase Utils" at "https://dl.bintray.com/memoriav/memobase"
),
libraryDependencies ++= Seq(
kafkaStreams,
log4jApi,
log4jCore,
log4jSlf4j,
log4jScala,
memobaseServiceUtils excludeAll (ExclusionRule(organization =
"org.slf4j"
)),
scalatic,
scalaUri,
uPickle,
kafkaStreamsTestUtils % Test,
scalaTest % Test
)
)
apiVersion: v2
name: media-metadata-preprocessor
description: Preprocesses records for a potential enrichment with media metadata
type: application
version: 0.0.0
appVersion: 0.0.0
maintainers:
- name: Sebastian Schüpbach
email: sebastian.schuepbach@unibas.ch
deploymentName: ip-media-metadata-preprocessor
inputTopicName: import-process-media-metadata
outputTopicAVName: import-process-av-enrichment
outputTopicIgnoreName: import-process-normalization
outputTopicImageName: import-process-image-enrichment
reportingTopicName: import-process-reporting
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.deploymentName }}-config"
namespace: memobase
data:
APPLICATION_ID: "{{ .Values.applicationId }}"
TOPIC_IN: "{{ .Values.inputTopicName }}"
TOPIC_OUT_AV: "{{ .Values.outputTopicAVName }}"
TOPIC_OUT_IMAGE: "{{ .Values.outputTopicImageName }}"
TOPIC_OUT_IGNORE: "{{ .Values.outputTopicIgnoreName }}"
TOPIC_PROCESS: "{{ .Values.reportingTopicName }}"
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Values.deploymentName }}"
namespace: memobase
labels:
jobType: "import-process-deployment"
spec:
selector:
matchLabels:
app: {{ .Values.deploymentName }}
replicas: 3
template:
metadata:
labels:
app: {{ .Values.deploymentName }}
tier: import-process
spec:
restartPolicy: Always
containers:
- name: "{{ .Values.deploymentName }}-container"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
imagePullPolicy: Always
command: [ "java" ]
args: [ "-Xms64m", "-Xmx256g", "-jar", "/app/app.jar" ]
resources:
requests:
cpu: "0.2"
memory: "128Mi"
limits:
cpu: "0.5"
memory: "300Mi"
envFrom:
- configMapRef:
name: "{{ .Values.bootstrapServers }}"
- configMapRef:
name: "{{ .Values.deploymentName }}-config"
############################################
## Values in this section are the same for #
## all jobs #
############################################
#image values
registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/media-metadata-extractor"
tag: "latest"
deploymentName: ip-media-metadata-extractor
applicationId: media-metadata-extractor-app
bootstrapServers: prod-kafka-bootstrap-servers
inputTopicName: import-process-media-metadata
outputTopicAVName: import-process-av-enrichment
outputTopicIgnoreName: import-process-normalization
outputTopicImageName: import-process-image-enrichment
reportingTopicName: import-process-reporting
/*
* media-metadata-preprocessor
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
import sbt._
object Dependencies {
lazy val kafkaV = "2.3.1"
lazy val log4jV = "2.11.2"
lazy val scalatestV = "3.1.2"
lazy val kafkaStreams = "org.apache.kafka" %% "kafka-streams-scala" % kafkaV
lazy val kafkaStreamsTestUtils = "org.apache.kafka" % "kafka-streams-test-utils" % kafkaV
lazy val log4jApi = "org.apache.logging.log4j" % "log4j-api" % log4jV
lazy val log4jCore = "org.apache.logging.log4j" % "log4j-core" % log4jV
lazy val log4jScala = "org.apache.logging.log4j" %% "log4j-api-scala" % "11.0"
lazy val log4jSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jV
lazy val memobaseServiceUtils = "org.memobase" % "memobase-service-utilities" % "1.12.2"
lazy val scalatic = "org.scalactic" %% "scalactic" % scalatestV
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalatestV
lazy val scalaUri = "io.lemonlabs" %% "scala-uri" % "2.2.3"
lazy val uPickle = "com.lihaoyi" %% "upickle" % "0.9.5"
}
#
# media-metadata-preprocessor
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# suppress inspection "UnusedProperty"
sbt.version=1.3.10
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
\ No newline at end of file
<scalastyle>
<name>Scalastyle standard configuration</name>
<check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* media-metadata-preprocessor
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
<parameters>
<parameter name="maxLineLength"><![CDATA[160]]></parameter>
<parameter name="tabSize"><![CDATA[4]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
<parameters>
<parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
<parameter name="maxParameters"><![CDATA[8]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
<parameters>
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[println]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
<parameters>
<parameter name="maxTypes"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
<parameter name="maximum"><![CDATA[10]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
<parameters>
<parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
<parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
<parameters>
<parameter name="maxLength"><![CDATA[50]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
<parameters>
<parameter name="maxMethods"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"/>
</scalastyle>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ media-metadata-preprocessor
~ Copyright (C) 2020 Memoriav
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License as
~ published by the Free Software Foundation, either version 3 of the
~ License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
~
-->
<Configuration status="info" name="media-metadata-preprocessor" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5p] [%d{yyyy-MM-dd HH:mm:ss}] [%c{1.}:%L] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
/*
* media-metadata-preprocessor
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import java.time.Duration
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.scala.Logging
import scala.util.{Failure, Success, Try}
object App extends scala.App with Logging {
val topology = new KafkaTopology
val streams = new KafkaStreams(
topology.build,
SettingsFromFile.kafkaStreamsSettings
)
val shutdownGracePeriodMs = 10000
logger.trace("Starting stream processing")
Try(
streams.start()
) match {
case Success(_) =>
logger.info("Kafka Streams workflow successfully started.")
case Failure(f) =>
logger.error(s"Aborting due to errors: ${f.getMessage}")
sys.exit(1)
}
sys.ShutdownHookThread {
streams.close(Duration.ofMillis(shutdownGracePeriodMs))
}
}
/*
* media-metadata-preprocessor
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
final case class UrlParseException(private val message: String = "",
private val cause: Throwable = None.orNull)
extends Exception(message, cause)
/*
* media-metadata-preprocessor
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import ch.memobase.models._
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream.KStream
import org.apache.kafka.streams.scala.{StreamsBuilder, _}
import org.apache.logging.log4j.scala.Logging
class KafkaTopology extends Logging {
import KafkaTopologyUtils._
import Serdes._
def build: Topology = {
val builder = new StreamsBuilder
val source =
builder.stream[String, String](SettingsFromFile.inputTopic)
val Array(recordWithLocator, locatorlessRecord) = checkForLocators(source)
val Array(enrichableRecordImage, enrichableRecordAV, unenrichableRecord) =
checkIfEnrichable(
recordWithLocator
)
sendRecordsDownstream(
unenrichableRecord,
SettingsFromFile.outputTopicIgnore
)
sendRecordsDownstream(
enrichableRecordAV,
SettingsFromFile.outputTopicAV
)
sendRecordsDownstream(
enrichableRecordImage,
SettingsFromFile.outputTopicImage
)
reportEnrichableRecords(enrichableRecordAV)
reportEnrichableRecords(enrichableRecordImage)
reportLocatorlessRecords(locatorlessRecord)
reportUnenrichableRecords(unenrichableRecord)
builder.build()
}
private def checkIfEnrichable(
stream: KStream[String, (String, List[ResourceWithLocator])]
) =
stream
.mapValues(v => setMediaPlayerType(v))
.mapValues(v => (v._1, filterEnrichableResources(v._2)))
.branch(
(_, v) => v._2.nonEmpty && v._2.forall(_.player == ImageViewer),
(_, v) => v._2.nonEmpty,
(_, v) => v._2.isEmpty
)
private def checkForLocators(stream: KStream[String, String]) =
stream
.mapValues((k, v) => getEbucoreLocators(k, v))
.branch(
(_, v) => v._2.nonEmpty,
(_, v) => v._2.isEmpty
)
private def setMediaPlayerType(
value: (String, List[ResourceWithLocator])
): (String, List[ResourceWithLocator]) = {
val newProperties = value._2.foldLeft("")((agg, x) =>
agg + "\n" + NtriplesStatement(
x.resource,
"http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#isDistributedOn",
Literal(x.player.name)
).toString
)
(value._1 + newProperties, value._2)
}
private def sendRecordsDownstream[T](
records: KStream[String, (String, T)],
outputTopic: String
): Unit =
records
.mapValues(_._1)
.to(outputTopic)
private def reportEnrichableRecords(
enrichableRecord: KStream[String, (String, List[ResourceWithLocator])]
): Unit =
enrichableRecord
.mapValues((k, _) =>
ReportingObject(k, ProcessingSuccess, "Record is enrichable").toString
)
.to(SettingsFromFile.reportTopic)
private def reportUnenrichableRecords(
unenrichableRecord: KStream[String, (String, List[ResourceWithLocator])]
): Unit =
unenrichableRecord