Commit 0ecc8c59 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fixed some bugs in media linker.

Expand report to include step & timestamp.
Improves reporting messages.
Failes if there is no record, digital object or original identifier.
Add configuration for extensions.
Update tests.
Remove unused code.
parent 3dd6c7cf
Pipeline #15878 passed with stages
in 9 minutes and 1 second
......@@ -6,6 +6,7 @@ metadata:
data:
APPLICATION_ID: {{ .Values.applicationId }}
SFTP_BASE_PATH: {{ .Values.sftpBasePath }}
EXTENSTIONS: {{ .Values.extensions }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
......@@ -14,6 +14,7 @@ kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
sftpBasePath: /swissbib_index/mb_sftp
extensions: jpg,jpeg,png,mp3,mp4
inputTopicName: import-process-mapper
outputTopicName: import-process-media-linker
......
......@@ -3,15 +3,14 @@ package org.memobase
object Constant {
const val mediaFolderName = "media"
const val thumbnailFolderName ="thumbnails"
const val recordSetIdPropertyName = "recordSetId"
const val sftpBasePathPropertyName = "sftp.basePath"
const val topicReportingSuffix = "reporting"
const val hasThumbnailsPropertyName = "hasThumbnails"
const val extensionsPropertyName = "extensions"
const val rdfParserLang = "NTRIPLES"
const val digitalObject = "digitalObject"
const val identifierType = "original"
const val thumbnailRicoType = "thumbnail"
const val sftpPathPrefix = "sftp:"
......
package org.memobase
class FolderNotFoundException(folder: String) : Exception("Folder $folder does not exist on sftp server.")
\ No newline at end of file
......@@ -41,6 +41,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val sftpClient = SftpClient(settings.sftpSettings)
private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
private val reportingTopic = settings.processReportTopic
fun prepare(): StreamsBuilder {
......@@ -52,11 +53,11 @@ class KafkaTopology(private val settings: SettingsLoader) {
.transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value -> createModel(value) }
.mapValues { value -> extractSubjects(value) }
.mapValues { value ->
.mapValues { readOnlyKey, value ->
enrichSftpLocator(
"key",
Triple(value.first, value.second, Report("", "", "")),
"thumbnails"
readOnlyKey,
Triple(value.first, value.second, Report("", status = "", message = "")),
Constant.thumbnailFolderName
)
}
.branch(
......@@ -65,38 +66,34 @@ class KafkaTopology(private val settings: SettingsLoader) {
)
val updateDigitalObjects = instantiationBranch[0]
.mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value) }
.mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value, Constant.mediaFolderName) }
updateDigitalObjects
.filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
.mapValues { value ->
val out = StringWriter()
value.first.first.write(out, Constant.rdfParserLang)
out.toString().trim()
}
updateDigitalObjects
.to(settings.outputTopic)
// report
updateDigitalObjects
.mapValues { readOnlyKey, _ ->
Report(
readOnlyKey,
ReportStatus.success,
"Create sftp file path to instantiation."
).toJson()
}
.mapValues { value -> value.third.toJson() }
.to(reportingTopic)
val unchangedWrittenResources = instantiationBranch[1]
instantiationBranch[1]
.filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
.mapValues { value ->
val out = StringWriter()
value.first.first.write(out, Constant.rdfParserLang)
out.toString().trim()
}
unchangedWrittenResources
.to(settings.outputTopic)
unchangedWrittenResources
.mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
instantiationBranch[1]
.mapValues { _, value -> value.third.toJson() }
.to(reportingTopic)
return builder
......@@ -110,13 +107,14 @@ class KafkaTopology(private val settings: SettingsLoader) {
return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
}
private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
return res.filter {
it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(
RICO.type,
Constant.identifierType
)
}
private fun getOriginalIdentifiers(record: Resource): List<Resource> {
return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
.filter { resource ->
resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
RICO.type,
Constant.identifierType
)
}
}
private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
......@@ -128,67 +126,84 @@ class KafkaTopology(private val settings: SettingsLoader) {
private fun enrichSftpLocator(
key: String,
data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
type: String = "media"
type: String
): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
var link = ""
val digitalObject =
data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
// This assumes that there is only one, and always one original identifier present!
val originalIdentifier = try {
getOriginalIdentifiers(data.second)[0]
} catch (ex: IndexOutOfBoundsException) {
return Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
)
}
val value = originalIdentifier.getProperty(RICO.identifier).string
val record =
data.second.first { it.hasProperty(RDF.type, RICO.Record) }
val fileExtensions = arrayOf("jpg", "jpeg", "png", "mp3", "mp4")
for (extension in fileExtensions) {
val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
if (sftpClient.exists(filePath)) {
link = "${Constant.sftpPathPrefix}$filePath"
if (type == "media") {
val literal = ResourceFactory.createPlainLiteral(link)
digitalObject.addLiteral(EBUCORE.locator, literal)
data.first.first.createLiteral(digitalObject.toString(), true)
} else if (type == "thumbnails") {
val thumbnail = data.first.first.createResource(
"https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
)
val literal = ResourceFactory.createPlainLiteral(link)
thumbnail.addProperty(RDF.type, RICO.Instantiation)
thumbnail.addProperty(RICO.type, "thumbnail")
thumbnail.addProperty(EBUCORE.locator, literal)
digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
record.addProperty(RICO.hasInstantiation, thumbnail)
thumbnail.addProperty(RICO.instantiates, record)
return data.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }.let { record ->
if (record != null) {
data.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }.let { digitalObject ->
if (digitalObject != null) {
val originalIdentifier = try {
getOriginalIdentifiers(record)[0]
} catch (ex: IndexOutOfBoundsException) {
return Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
)
}
val value = originalIdentifier.getProperty(RICO.identifier).string
for (extension in fileExtensions) {
val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
if (sftpClient.exists(filePath)) {
link = "${Constant.sftpPathPrefix}$filePath"
if (type == Constant.mediaFolderName) {
val literal = ResourceFactory.createPlainLiteral(link)
digitalObject.addLiteral(EBUCORE.locator, literal)
data.first.first.createLiteral(digitalObject.toString(), true)
} else if (type == Constant.thumbnailFolderName) {
val thumbnail = data.first.first.createResource(
"https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
)
val literal = ResourceFactory.createPlainLiteral(link)
thumbnail.addProperty(RDF.type, RICO.Instantiation)
thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
thumbnail.addProperty(EBUCORE.locator, literal)
digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
record.addProperty(RICO.hasInstantiation, thumbnail)
thumbnail.addProperty(RICO.instantiates, record)
}
return Triple(
data.first,
data.second,
Report(
record.uri,
ReportStatus.success,
"${data.third.message}\n${
ReportMessages.reportSuccess(
digitalObject.uri,
link,
type
)
}".trim()
)
)
}
}
Triple(
data.first,
data.second,
Report(
record.uri,
ReportStatus.failure,
"${data.third.message}\n${ReportMessages.reportFailure(digitalObject.uri, type)}".trim()
)
)
} else // digital object is null!
{
Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, "No digital object present in model.")
)
}
}
return Triple(
data.first,
data.second,
Report(
record.uri,
ReportStatus.success,
data.third.message + "\n" + ReportMessages.reportSuccess(digitalObject.uri, link)
)
)
} else // record is null
{
Triple(data.first, data.second, Report(key, ReportStatus.failure, "No record present in model."))
}
}
return Triple(
data.first,
data.second,
Report(
record.uri,
ReportStatus.failure,
data.third.message + "\n" + ReportMessages.reportSuccess(digitalObject.uri, link)
)
)
}
}
......@@ -19,13 +19,39 @@
package org.memobase
import com.beust.klaxon.Klaxon
import java.time.LocalDateTime
data class Report(
val id: String,
val status: String, // success even if thumbnail is failue
val message: String // "ok, thumbnail created" oder "ok, no thumbnail"
val status: String, // success even if thumbnail is failure
val message: String, // "ok, thumbnail created" oder "ok, no thumbnail"
val step: String = "media-linker",
val timestamp: String = LocalDateTime.now().toString()
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as Report
if (id != other.id) return false
if (status != other.status) return false
if (message != other.message) return false
if (step != other.step) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + status.hashCode()
result = 31 * result + message.hashCode()
result = 31 * result + step.hashCode()
return result
}
}
......@@ -27,16 +27,14 @@ class Service(file: String = "app.yml") {
val settings = SettingsLoader(
listOf(
Constant.sftpBasePathPropertyName
Constant.sftpBasePathPropertyName,
Constant.extensionsPropertyName
),
file,
useStreamsConfig = true,
readSftpSettings = true
)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val builder = KafkaTopology(settings).prepare()
fun run() {
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
......
package org.memobase.reports
import org.memobase.Constant
object ReportMessages {
fun reportSuccess(id: String, value: String): String {
return "Successfully enriched a sftp locator for resource $id with path $value"
fun reportSuccess(id: String, value: String, type: String): String {
return if (type == Constant.mediaFolderName)
"Successfully enriched a sftp locator for resource $id with path $value"
else
"Successfully created a thumbnail with id $id and locator $value"
}
fun reportFailure(id: String): String {
return "Could not enrich sftp locator. Did no find a valid file for resource $id."
fun reportFailure(id: String, type: String): String {
return "No valid file found for id $id in folder $type."
}
fun noOriginalIdentifier(key: String): String {
......
......@@ -6,6 +6,7 @@ sftp:
app:
sftp:
basePath: ${SFTP_BASE_PATH:?system}
extensions: ${EXTENSIONS:?system}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -18,13 +18,12 @@
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import java.io.File
import java.io.FileInputStream
import java.nio.charset.Charset
import java.nio.file.Paths
import java.util.stream.Stream
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
......@@ -32,124 +31,150 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.rdf.NS
import org.memobase.testing.EmbeddedSftpServer
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.nio.file.Paths
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestKafkaTopology {
private val log = LogManager.getLogger("TestLogger")
private val regex = Regex("(_:B[A-Za-z0-9]+)")
private val resourcePath = "src/test/resources/data"
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
private fun readByteArray(fileName: String): ByteArray {
return File("$resourcePath/$fileName").readBytes()
}
private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
init {
val media = listOf(
Pair("/base/test_record_set_1/media", "MEI_49884.jpg"),
Pair("/base/test_record_set_1/media", "MEI_49885.jpg"),
Pair("/base/test_record_set_1/media", "MEI_49886.jpg")
Pair("/base/test-record-set-id/media", "MEI_49884.jpg"),
Pair("/base/test-record-set-id/media", "MEI_49885.jpg"),
Pair("/base/test-record-set-id/media", "MEI_49886.jpg")
)
val thumbnails = listOf(
Pair("/base/test_record_set_1/thumbnails", "MEI_49884.jpg"),
Pair("/base/test_record_set_1/thumbnails", "MEI_49885.jpg"),
Pair("/base/test_record_set_1/thumbnails", "MEI_49886.jpg")
Pair("/base/test-record-set-id/thumbnails", "MEI_49884.jpg"),
Pair("/base/test-record-set-id/thumbnails", "MEI_49885.jpg"),
Pair("/base/test-record-set-id/thumbnails", "MEI_49886.jpg")
)
for (pair in media) {
sftpServer.putFile(Paths.get(pair.first, pair.second).toString(), FileInputStream(Paths.get("src/test/resources/data", pair.second).toFile()))
sftpServer.putFile(
Paths.get(pair.first, pair.second).toString(),
FileInputStream(Paths.get("src/test/resources/data", pair.second).toFile())
)
}
for (pair in thumbnails) {
sftpServer.putFile(Paths.get(pair.first, pair.second).toString(), FileInputStream(Paths.get("src/test/resources/data/thumbnails", pair.second).toFile()))
sftpServer.putFile(
Paths.get(pair.first, pair.second).toString(),
FileInputStream(Paths.get("src/test/resources/data/thumbnails", pair.second).toFile())
)
}
}
private val regex = Regex("(_:B[A-Za-z0-9]+)")
private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")
private fun sort(source: List<String>): String {
return source.map {
var replacedString = it
for (matchResult in regex.findAll(it)) {
replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
}
for (matchResult in regexTime.findAll(it)) {
replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
}
replacedString
}.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
}
@ParameterizedTest
@MethodSource("testParams")
fun `test inputs`(params: TestParams) {
val service = Service(params.settingsFileName)
val testDriver = TopologyTestDriver(service.builder.build(), service.settings.kafkaStreamsSettings)
val testDriver = TopologyTestDriver(KafkaTopology(service.settings).prepare().build(), service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(StringSerializer(), StringSerializer())
val value = readFile(params.inputFileName)
val inputValue = readFile("input${params.count}.nt")
val headers = RecordHeaders()
headers.add(RecordHeader("sessionId", "test-session-id".toByteArray()))
headers.add(RecordHeader("recordSetId", "test-record-set-id".toByteArray()))
headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray()))
testDriver.pipeInput(
factory.create(service.settings.inputTopic, params.key, value, headers)
factory.create(service.settings.inputTopic, params.key, inputValue, headers)
)
var record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
val record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
assertThat(record).isNotNull
var count = 0
while (record != null) {
count += 1
val sortedResult = record.value().lines().map {
var replacedString = it
for (matchResult in regex.findAll(it)) {
replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
}
replacedString
}.sorted().reduce { acc, s -> acc + "\n" + s }
assertThat(record)
.isNotNull
.hasFieldOrPropertyWithValue("key", params.expectedOutputKey)
assertThat(sortedResult).isEqualTo(readFile(params.expectedOutputFile))
val reportedRecord = testDriver.readOutput(
"${service.settings.outputTopic}-reporting",
StringDeserializer(),
StringDeserializer()
)
val keyValue = if (record != null) {
val resultValue = record.value()
val resultKey = record.key()
if (reportedRecord != null) {
val data = reportedRecord.value()
val report = Klaxon().parse<Report>(data)
assertThat(report)
.isEqualTo(Klaxon().parse<Report>(readFile(params.expectedProcessReport)))
} else {
log.error("No report for record $record.")
val model = ModelFactory.createDefaultModel()
NS.prefixMapping.map {
model.setNsPrefix(it.key, it.value)
}
record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
RDFDataMgr.read(model, resultValue.byteInputStream(), Lang.NTRIPLES)
RDFDataMgr.write(
FileOutputStream("$resourcePath/turtle-output${params.count}.ttl"),
model,
RDFFormat.TURTLE_PRETTY
)
Pair(resultKey, resultValue)
} else {
Pair("", "")
}
val reportedRecord = testDriver.readOutput(
service.settings.processReportTopic,
StringDeserializer(),
StringDeserializer()
)
val data = reportedRecord.value()
val report = Klaxon().parse<Report>(data)
assertAll(
"",
{
assertThat(keyValue.first)
.isEqualTo(params.expectedOutputKey)
},
{
assertThat(sort(keyValue.second.lines())).isEqualTo(
readFile("output${params.count}.nt")
)
},
{
assertThat(report)
.isEqualTo(Klaxon().parse<Report>(readFile("output${params.count}.json")))
}
)
}
private fun testParams() = Stream.of(
TestParams(
"test1.yml",