Unverified Commit 039217e6 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

small improvements in reporting


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 8d7ab27a
...@@ -18,16 +18,13 @@ ...@@ -18,16 +18,13 @@
package org.memobase package org.memobase
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
import org.apache.jena.rdf.model.Model import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.ResourceFactory import org.apache.jena.rdf.model.ResourceFactory
import org.apache.jena.riot.RiotException import org.apache.jena.riot.RiotException
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE import org.memobase.rdf.EBUCORE
...@@ -39,6 +36,10 @@ import org.memobase.settings.SettingsLoader ...@@ -39,6 +36,10 @@ import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient import org.memobase.sftp.SftpClient
import settings.HeaderExtractionTransformSupplier import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata import settings.HeaderMetadata
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
private val appSettings = settings.appSettings private val appSettings = settings.appSettings
...@@ -130,9 +131,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -130,9 +131,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
} }
.to(reportingTopic) .to(reportingTopic)
requiredFieldsAvailable[1] sendDownstream(requiredFieldsAvailable[1])
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
val hasLocatorBranch = requiredFieldsAvailable[2] val hasLocatorBranch = requiredFieldsAvailable[2]
.mapValues { readOnlyKey, value -> .mapValues { readOnlyKey, value ->
...@@ -143,7 +142,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -143,7 +142,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
} }
.branch( .branch(
Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching Predicate { _, _ -> true } // Indicates a possible remote media file
) )
val updateDigitalObjects = hasLocatorBranch[0] val updateDigitalObjects = hasLocatorBranch[0]
...@@ -155,22 +154,32 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -155,22 +154,32 @@ class KafkaTopology(private val settings: SettingsLoader) {
enrichedModel enrichedModel
} }
updateDigitalObjects
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
updateDigitalObjects val remoteThumbnails = hasLocatorBranch[1]
.mapValues { value -> value.third.toJson() } .mapValues { value ->
.to(reportingTopic) updateRecord(
value,
value.third.status,
digitalObjectMessage = "no local media files"
)
}
.branch(
Predicate { _, value -> value.third.status == "SUCCESS" }, // Local thumbnail linked; don't try to fetch others
Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
)
hasLocatorBranch[1] sendDownstream(remoteThumbnails[0])
.mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) } report(remoteThumbnails[0])
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
hasLocatorBranch[1] val remotelyEnrichedThumbnails =
.mapValues { _, value -> value.third.toJson() } remoteThumbnails[1]
.to(reportingTopic) .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
sendDownstream(updateDigitalObjects)
report(updateDigitalObjects)
sendDownstream(remotelyEnrichedThumbnails)
report(remotelyEnrichedThumbnails)
return builder return builder
} }
...@@ -184,7 +193,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -184,7 +193,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
when { when {
isNoValidUrl(locator) -> { isNoValidUrl(locator) -> {
log.warn("No valid locator url found for ${value.third.id}") log.warn("No valid locator url found for ${value.third.id}")
return updateRecord(value, ReportStatus.warning, thumbnailMessage = "no valid locator url") return updateRecord(value, ReportStatus.warning, digitalObjectMessage = "no valid locator url")
} }
RemoteResourceHandler.isVimeoUrl(locator) -> { RemoteResourceHandler.isVimeoUrl(locator) -> {
log.info("Trying to download thumbnail file on vimeo for ${value.third.id}") log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
...@@ -216,8 +225,8 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -216,8 +225,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource") log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
return updateRecord( return updateRecord(
value, value,
ReportStatus.success, ReportStatus.ignore,
thumbnailMessage = "no additional thumbnails fetched" thumbnailMessage = "no thumbnails available"
) )
} }
}.let { h -> }.let { h ->
...@@ -261,7 +270,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -261,7 +270,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
) )
return updateRecord( return updateRecord(
value, value,
value.third.status, ReportStatus.success,
thumbnailMessage = "youtube / vimeo thumbnail fetched" thumbnailMessage = "youtube / vimeo thumbnail fetched"
) )
} else { } else {
...@@ -279,7 +288,6 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -279,7 +288,6 @@ class KafkaTopology(private val settings: SettingsLoader) {
oembedObject: OembedResponse oembedObject: OembedResponse
): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> { ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
val digitalObjectResource = getDigitalObjectResource(value.second)!! val digitalObjectResource = getDigitalObjectResource(value.second)!!
// TODO
if (oembedObject.width != null) { if (oembedObject.width != null) {
val width = ResourceFactory.createPlainLiteral(oembedObject.width.toString()) val width = ResourceFactory.createPlainLiteral(oembedObject.width.toString())
digitalObjectResource.addLiteral(EBUCORE.width, width) digitalObjectResource.addLiteral(EBUCORE.width, width)
...@@ -465,4 +473,16 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -465,4 +473,16 @@ class KafkaTopology(private val settings: SettingsLoader) {
) )
return updateRecord(Triple(value.first, value.second, report), status = status) return updateRecord(Triple(value.first, value.second, report), status = status)
} }
private fun report(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
kstream
.mapValues { _, value -> value.third.toJson() }
.to(reportingTopic)
}
private fun sendDownstream(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
kstream
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
}
} }
...@@ -53,7 +53,7 @@ data class FinalReport( ...@@ -53,7 +53,7 @@ data class FinalReport(
data class Report( data class Report(
val id: String, val id: String,
val status: String, // success even if thumbnail is failure val status: String,
val generalFailureMessage: String = "", val generalFailureMessage: String = "",
val digitalObjectMessage: String = "", // "ok, thumbnail created" oder "ok, no thumbnail" val digitalObjectMessage: String = "", // "ok, thumbnail created" oder "ok, no thumbnail"
val thumbnailMessage: String = "" val thumbnailMessage: String = ""
......
{ {
"id": "https://memobase.ch/record/Tanzarchiv-42858-43349", "id": "https://memobase.ch/record/Tanzarchiv-42858-43349",
"status": "IGNORE", "status": "SUCCESS",
"message": "No valid file found for id https://memobase.ch/digital/Tanzarchiv-42858-43349-1 in folder thumbnails." "message": "No valid file found for id https://memobase.ch/digital/Tanzarchiv-42858-43349-1 in folder thumbnails."
} }
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment