Commit 2655471f authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor media linker to self terminate.

parent 829ee147
......@@ -11,7 +11,7 @@ test:
tags:
- mbr
script:
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast --tests "org.memobase.Tests"
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast
.build-image:
......
......@@ -47,7 +47,8 @@ dependencies {
// JSON Parser
implementation 'com.beust:klaxon:5.2'
implementation 'org.memobase:memobase-service-utilities:1.4.0'
implementation 'org.memobase:memobase-service-utilities:1.12.1'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
implementation 'org.apache.jena:apache-jena:3.14.0'
......
......@@ -23,7 +23,7 @@ import org.apache.logging.log4j.LogManager
class App {
companion object {
private val log = LogManager.getLogger("TableDataTransformApp")
private val log = LogManager.getLogger("MediaLinkerApp")
@JvmStatic fun main(args: Array<String>) {
try {
Service().run()
......
package org.memobase
object Constant {
const val mediaFolderName = "media"
const val recordSetIdPropertyName = "recordSetId"
const val sftpBasePathPropertyName = "sftp.basePath"
const val topicReportingSuffix = "reporting"
const val rdfParserLang = "NTRIPLES"
const val digitalObject = "digitalObject"
const val identifierType = "original"
const val sftpPathPrefix = "sftp:"
}
\ No newline at end of file
......@@ -18,7 +18,6 @@
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.StringReader
import java.io.StringWriter
import kotlin.system.exitProcess
......@@ -28,10 +27,10 @@ import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.Statement
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
......@@ -42,104 +41,108 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("MediaLinker")
private val sftpClient = SftpClient(settings.sftpSettings)
private val sftpBasePath = settings.appSettings.getProperty("sftp.basePath")
private val sftpBasePath = settings.appSettings.getProperty(Constant.sftpBasePathPropertyName)
private val files = createMediaFileList(settings.appSettings.getProperty("recordSetId"))
private val files = createMediaFileList(settings.appSettings.getProperty(Constant.recordSetIdPropertyName))
private fun createMediaFileList(setting: String): List<String> {
return try {
val list = sftpClient.listFiles("$sftpBasePath/$setting/media")
val list = sftpClient.listFiles("$sftpBasePath/$setting/${Constant.mediaFolderName}")
log.info("Files found on sftp server: $list.")
list
} catch (ex: Exception) {
ex.printStackTrace()
log.error(ex.localizedMessage + ": $sftpBasePath/$setting/media")
log.error(ex.localizedMessage + ": $sftpBasePath/$setting/${Constant.mediaFolderName}")
exitProcess(1)
}
}
private val reportingTopic = settings.outputTopic + "-reporting"
private val reportingTopic = "${settings.outputTopic}-${Constant.topicReportingSuffix}"
fun build(): Topology {
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
val instantiationBranch = stream
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.branch(
Predicate { _, value -> isDigitalObjectWithoutLocator(value.second) },
Predicate { _, _ -> true }
)
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.branch(
Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
Predicate { _, _ -> true }
)
val updateDigitalObjects = instantiationBranch[0]
.mapValues { value -> enrichSftpLocator(value) }
.mapValues { value ->
val out = StringWriter()
value.first.write(out, "NTRIPLES")
out.toString().trim()
}
.mapValues { value -> enrichSftpLocator(value) }
.mapValues { value ->
val out = StringWriter()
value.first.write(out, Constant.rdfParserLang)
out.toString().trim()
}
updateDigitalObjects
.to(settings.outputTopic)
.to(settings.outputTopic)
updateDigitalObjects
.mapValues { readOnlyKey, _ ->
Klaxon().toJsonString(
Report(
readOnlyKey,
ReportStatus.success,
"Added sftp locator link to digital resource."
)
)
}
.to(reportingTopic)
.mapValues { readOnlyKey, _ ->
Report(
readOnlyKey,
ReportStatus.success,
"Create sftp file path to instantiation."
).toJson()
}
.to(reportingTopic)
val unchangedWrittenResources = instantiationBranch[1]
.mapValues { value ->
val out = StringWriter()
value.first.write(out, "NTRIPLES")
out.toString().trim()
}
.mapValues { value ->
val out = StringWriter()
value.first.write(out, Constant.rdfParserLang)
out.toString().trim()
}
unchangedWrittenResources
.to(settings.outputTopic)
.to(settings.outputTopic)
unchangedWrittenResources
.mapValues { key, _ -> Klaxon().toJsonString(Report(key, ReportStatus.success, "No changes made.")) }
.to(reportingTopic)
.mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
.to(reportingTopic)
return builder.build()
return builder
}
private fun extractSubjects(model: Model): Pair<Model, List<Resource>> {
return Pair(model, model.listSubjects().toList())
}
private fun isDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
return res.any { it.hasProperty(RICO.type, "digitalObject") && !it.hasProperty(EBUCORE.locator) }
private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
}
private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
return res.filterNot { it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(RICO.type, Constant.identifierType) }
}
private fun createModel(data: String): Model {
val model = ModelFactory.createDefaultModel()
model.read(StringReader(data), "", "NTRIPLES")
model.read(StringReader(data), "", Constant.rdfParserLang)
return model
}
private fun enrichSftpLocator(data: Pair<Model, List<Resource>>): Pair<Model, Report> {
var link = ""
val digitalObject = data.second.first { it.hasProperty(RICO.type, "digitalObject") && !it.hasProperty(EBUCORE.locator) }
val digitalObject =
data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
val addedStatements = mutableListOf<Statement>()
for (stmt in digitalObject.listProperties(RICO.identifiedBy)) {
if (stmt != null) {
if (stmt.`object`.isResource) {
val resource = stmt.`object`.asResource()
if (resource.hasLiteral(RICO.type, "original")) {
if (resource.hasLiteral(RICO.type, Constant.identifierType)) {
val value = resource.getProperty(RICO.identifier).literal
for (file in files) {
if (file.contains(value.string)) {
link = "sftp:$file"
link = "${Constant.sftpPathPrefix}$file"
val literal = data.first.createLiteral(link)
addedStatements.add(StatementImpl(digitalObject, EBUCORE.locator, literal))
}
......@@ -151,21 +154,21 @@ class KafkaTopology(private val settings: SettingsLoader) {
if (addedStatements.size > 0) {
data.first.add(addedStatements)
return Pair(
data.first,
Report(
digitalObject.uri,
ReportStatus.success,
ReportMessages.reportSuccess(digitalObject.uri, link)
)
data.first,
Report(
digitalObject.uri,
ReportStatus.success,
ReportMessages.reportSuccess(digitalObject.uri, link)
)
)
} else {
return Pair(
data.first,
Report(
digitalObject.id.labelString,
ReportStatus.failure,
ReportMessages.reportFailure(digitalObject.id.labelString)
)
data.first,
Report(
digitalObject.id.labelString,
ReportStatus.failure,
ReportMessages.reportFailure(digitalObject.id.labelString)
)
)
}
}
......
......@@ -18,8 +18,14 @@
package org.memobase
import com.beust.klaxon.Klaxon
data class Report(
val id: String,
val status: String,
val message: String
)
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
......@@ -17,27 +17,45 @@
*/
package org.memobase
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("MediaLinkerService")
val settings = SettingsLoader(
listOf(
"recordSetId",
"sftp.basePath"
),
file,
useStreamsConfig = true,
readSftpSettings = true
listOf(
Constant.recordSetIdPropertyName,
Constant.sftpBasePathPropertyName
),
file,
useStreamsConfig = true,
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val registry = EffectsRegistry()
private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
fun run() {
registry.register(
ShutdownMessage(
appId.replace("-normalization-service", ""),
"normalization-service",
"termination"
), shutdownEffect
)
registry.run(builder, "import-process-admin")
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
......@@ -46,4 +64,8 @@ class Service(file: String = "app.yml") {
}
}
}
private fun exit(): BoxedUnit {
exitProcess(0)
}
}
package org.memobase.reports
object ReportMessages {
fun reportSuccess(id: String, value: String): String {
return "Successfully enriched a sftp locator for resource $id with path $value"
}
fun reportFailure(id: String): String {
return "Could not enrich sftp locator. Did no find a valid file for resource $id."
}
}
\ No newline at end of file
package org.memobase.reports
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
\ No newline at end of file
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.reports
object Formats {
const val csv = "CSV"
const val tsv = "TSV"
const val xlsx = "XLSX"
const val xls = "XLS"
const val ods = "ODS"
const val invalid = "INVALID"
const val error = "ERROR"
}
object Extensions {
const val csv = "csv"
const val tsv = "tsv"
const val xlsx = "xlsx"
const val xls = "xls"
const val ods = "ods"
}
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
object ReportMessages {
fun processFailure(fileName: String, message: String): String {
return "Could not process file $fileName, because $message"
}
fun processSuccess(count: Int): String {
return "Transformed table data into $count records."
}
fun invalidFile(fileName: String, message: String): String {
return "Invalid Input Error: $message for file $fileName."
}
fun reportSuccess(id: String, value: String): String {
return "Successfully enriched a sftp locator for resource $id with path $value"
}
fun reportFailure(id: String): String {
return "Could not enrich sftp locator. Did no find a valid file for resource $id."
}
}
......@@ -35,7 +35,7 @@ import org.junit.jupiter.params.provider.MethodSource
import org.memobase.testing.EmbeddedSftpServer
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
class TestKafkaTopology {
private val log = LogManager.getLogger("TestLogger")
private val regex = Regex("(_:B[A-Za-z0-9]+)")
......@@ -67,7 +67,7 @@ class Tests {
@MethodSource("testParams")
fun `test inputs`(params: TestParams) {
val service = Service(params.settingsFileName)
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val testDriver = TopologyTestDriver(service.builder.build(), service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(StringSerializer(), StringSerializer())
val value = readFile(params.inputFileName)
testDriver.pipeInput(
......
{
"id": "https://memobase.ch/instantiation/digital/BAZ-MEI_49885-1",
"status": "SUCCESS",
"message": "Added sftp locator link to digital resource."
"message": "Create sftp file path to instantiation."
}
\ No newline at end of file
{
"id": "https://memobase.ch/instantiation/physical/BAZ-MEI_49885-0",
"status": "SUCCESS",
"message": "No changes made."
"status": "FAILURE",
"message": "No linkable instantiation found."
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment