Commit 8bb963ed authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Initial commit

parent 1c680e1c
FROM gradle:6.3-jdk8
ADD . /
WORKDIR /
RUN gradle --no-daemon --no-scan --no-build-cache distTar
RUN cd /build/distributions && tar xf app.tar
FROM openjdk:8-jre-alpine
COPY --from=0 /build/distributions/app /app
CMD /app/bin/media-linker
......@@ -41,25 +41,15 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
// Kafka Imports
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
//implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.2.1'
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// XSLX / XSL Reader
implementation 'org.apache.poi:poi:4.1.2'
implementation 'org.apache.poi:poi-ooxml:4.1.2'
// ODS Reader
implementation 'org.odftoolkit:odftoolkit:1.0.0-BETA1'
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// Compression
//implementation "org.apache.commons:commons-compress:1.19"
implementation 'org.memobase:memobase-service-utilities:1.4.0'
implementation 'org.apache.jena:apache-jena:3.14.0'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
......
apiVersion: v2
name: media-linker
description: The media linker service job for the import process.
type: application
version: 0.0.0
appVersion: 0.0.0
maintainers:
- name: Jonas Waeber
email: jonas.waeber@unibas.ch
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
namespace: memobase
data:
APPLICATION_ID: "{{ .Values.processId }}-{{ .Values.jobName }}"
INSTITUTION_ID: "{{ .Values.institutionId }}"
RECORD_SET_ID: "{{ .Values.recordSetId }}"
TOPIC_IN: "{{ .Values.processId }}-{{ .Values.lastJobName }}"
TOPIC_OUT: "{{ .Values.processId }}-{{ .Values.jobName }}"
TOPIC_PROCESS: "{{ .Values.processId }}-reporting"
\ No newline at end of file
apiVersion: batch/v1
kind: Job
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}"
namespace: memobase
labels:
institutionId: "{{ .Values.institutionId }}"
recordSetId: "{{ .Values.recordSetId }}"
jobType: "import-job"
spec:
template:
spec:
containers:
- name: "{{ .Values.processId }}-{{ .Values.jobName }}"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
envFrom:
- secretRef:
name: "{{ .Values.sftpConfigs }}"
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
restartPolicy: Never
backoffLimit: 0
############################################
## Values in this section are the same for #
## all jobs #
############################################
#image values
registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/media-linker"
tag: "latest"
lastJobName: mapper-service
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
############################################
## Values below should be defined via the #
## User Interface (Drupal) #
############################################
jobName: media-linker
processId: p0001
institutionId: placeholder
recordSetId: placeholder
\ No newline at end of file
#!/usr/bin/env bash
echo Creating release: $1
sed -i "s/version: 0.0.0/version: $CI_COMMIT_TAG/g" ./chart/Chart.yaml
sed -i "s/appVersion: 0.0.0/appVersion: $CI_COMMIT_TAG/g" ./chart/Chart.yaml
sed -i "s/tag: \"latest\"/tag: $CI_COMMIT_TAG/g" ./chart/values.yaml
\ No newline at end of file
/*
* Media Linker
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import kotlin.system.exitProcess
import org.apache.logging.log4j.LogManager
class App {
companion object {
private val log = LogManager.getLogger("TableDataTransformApp")
@JvmStatic fun main(args: Array<String>) {
try {
Service().run()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
exitProcess(1)
}
}
}
}
/*
* Media Linker
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.ByteArrayInputStream
import java.io.StringWriter
import kotlin.system.exitProcess
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("MediaLinker")
private val sftpClient = SftpClient(settings.sftpSettings)
private val files = createMediaFileList(settings.appSettings.getProperty("recordSetId"))
private fun createMediaFileList(setting: String): List<String> {
return try {
sftpClient.listFiles("/$setting/media")
} catch (ex: Exception) {
ex.printStackTrace()
log.error(ex.localizedMessage + ": /$setting/media")
exitProcess(1)
}
}
private val reportingTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, ByteArray>(settings.inputTopic)
val instantiationBranch = stream
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.branch(
Predicate { _, value -> isDigitalObjectWithoutLocator(value.second) },
Predicate { _, _ -> true }
)
val updateDigitalObjects = instantiationBranch[0]
.mapValues { value -> enrichSftpLocator(value) }
.mapValues { value ->
val out = StringWriter()
value.first.write(out)
out.toString()
}
updateDigitalObjects
.to(settings.outputTopic)
updateDigitalObjects
.mapValues { readOnlyKey, _ ->
Klaxon().toJsonString(
Report(
readOnlyKey,
ReportStatus.success,
"Added sftp locator link to digital resource."
)
)
}
.to(reportingTopic)
val unchangedWrittenResources = instantiationBranch[1]
.mapValues { value ->
val out = StringWriter()
value.first.write(out)
out.toString()
}
unchangedWrittenResources
.to(settings.outputTopic)
unchangedWrittenResources
.mapValues { key, _ -> Klaxon().toJsonString(Report(key, ReportStatus.success, "No changes made.")) }
.to(reportingTopic)
return builder.build()
}
private fun extractSubjects(model: Model): Pair<Model, Resource> {
return Pair(model, model.listSubjectsWithProperty(RDF.type, RICO.Instantiation).next())
}
private fun isDigitalObjectWithoutLocator(res: Resource): Boolean {
return res.hasProperty(RICO.type, "digitalObject") && !res.hasProperty(EBUCORE.locator)
}
private fun createModel(data: ByteArray): Model {
val model = ModelFactory.createDefaultModel()
model.read(ByteArrayInputStream(data), "", "NTRIPLES")
return model
}
private fun enrichSftpLocator(data: Pair<Model, Resource>): Pair<Model, Report> {
var isEnriched = false
var link = ""
for (stmt in data.second.listProperties(RICO.identifiedBy)) {
if (stmt != null) {
if (stmt.`object`.isResource) {
val resource = stmt.`object`.asResource()
if (resource.hasLiteral(RICO.type, "original")) {
val value = resource.getProperty(RICO.identifier).literal
for (file in files) {
if (file.contains(value.string)) {
link = "sftp://$file"
val literal = data.first.createLiteral(link)
data.first.add(StatementImpl(data.second, EBUCORE.locator, literal))
isEnriched = true
}
}
}
}
}
}
if (isEnriched) {
return Pair(
data.first,
Report(
data.second.id.labelString,
ReportStatus.success,
ReportMessages.reportSuccess(data.second.id.labelString, link)
)
)
} else {
return Pair(
data.first,
Report(
data.second.id.labelString,
ReportStatus.failure,
ReportMessages.reportFailure(data.second.id.labelString)
)
)
}
}
}
/*
* Media Linker
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class Report(
val id: String,
val status: String,
val message: String
)
/*
* Media Linker
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
val settings = SettingsLoader(
listOf(
"recordSetId"
),
file,
useStreamsConfig = true,
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
fun run() {
stream.use {
it.start()
while (stream.state().isRunning) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
}
}
}
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.rdf
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
object EBUCORE {
val hasGenre = prop("hasGenre")
val hasFormat = prop("hasFormat")
val hasMedium = prop("hasMedium")
val duration = prop("duration")
val displayAspectRatio = prop("displayAspectRatio")
val audioTrackConfiguration = prop("audioTrackConfiguration")
val playbackSpeed = prop("playbackSpeed")
val hasStandard = prop("hasStandard")
val locator = prop("locator")
private fun prop(name: String): Property {
return ResourceFactory.createProperty(NS.ebucore, name)
}
private fun res(name: String): Resource {
return ResourceFactory.createResource(NS.ebucore + name)
}
}
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.rdf
object NS {
val rdf = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
val rdfs = "http://www.w3.org/2000/01/rdf-schema#"
val owl = "http://www.w3.org/2002/07/owl#"
val skos = "http://www.w3.org/2004/02/skos/core#"
val rico = "https://www.ica.org/standards/RiC/ontology#"
val ebucore = "http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#"
val memint = "https://memobase.ch/institution/"
val memrs = "https://memobase.ch/recordSet/"
val memr = "https://memobase.ch/record/"
val mempo = "https://memobase.ch/instantiation/physical/"
val memdo = "https://memobase.ch/instantiation/digital/"
val dce = "http://purl.org/dc/elements/1.1/"
val dct = "http://purl.org/dc/terms/"
val schema = "http://schema.org/"
val foaf = "http://xmlns.com/foaf/0.1/"
val wdt = "http://www.wikidata.org/prop/direct/"
val wdtn = "http://www.wikidata.org/prop/direct-normalized/"
val rdau = "http://rdaregistry.info/Elements/u/"
}
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.rdf
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
object RDF {
val type: Property = prop("type")
private fun prop(name: String): Property {
return ResourceFactory.createProperty(NS.rdf, name)
}
private fun res(name: String): Resource {
return ResourceFactory.createResource(NS.rdf + name)
}
}
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.rdf
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory
object RICO {
// Classes
val Record = res("Record")
val Instantiation = res("Instantiation")
val Title = res("Title")
val Language = res("Language")
val Identifier = res("Identifier")
val Place = res("Place")
val Agent = res("Agent")
val Person = res("Person")
val CorporateBody = res("CorporateBody")
val Rule = res("Rule")
val CreationRelation = res("CreationRelation")
val SingleDate = res("SingleDate")
val DateRange = res("DateRange")
val DateSet = res("DateSet")
// datatype properties
val title: Property = prop("title")
val source: Property = prop("source")
val descriptiveNote: Property = prop("descriptiveNote")
val name: Property = prop("name")
val type: Property = prop("type")
val expressedDate: Property = prop("expressedDate")
val normalizedDateValue: Property = prop("normalizedDateValue")
val normalizedValue: Property = prop("normalizedValue")
val scopeAndContent: Property = prop("scopeAndContent")
val physicalCharacteristics: Property = prop("physicalCharacteristics")
// Object Properties
val hasTitle: Property = prop("hasTitle")
val isTitleOf: Property = prop("isTitleOf")
val hasLanguage: Property = prop("hasLanguage")
val isLanguageOf: Property = prop("isLanguageOf")
val identifiedBy: Property = prop("identifiedBy")
val identifies: Property = prop("identifies")
val identifier: Property = prop("identifier")
val hasSubject: Property = prop("hasSubject")
val publishedBy: Property = prop("publishedBy")