Unverified Commit f2ed626e authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor code

parent b5876503
Pipeline #16698 passed with stages
in 5 minutes and 5 seconds
.idea
.gradle
.metals
build
out
\ No newline at end of file
out
......@@ -18,8 +18,6 @@
package org.memobase
import java.io.StringReader
import java.io.StringWriter
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
......@@ -37,6 +35,10 @@ import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
class KafkaTopology(private val settings: SettingsLoader) {
private val appSettings = settings.appSettings
......@@ -63,6 +65,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
model[1]
.mapValues { key, _ ->
log.warn("Invalid input data. Check mapper service processing.")
Report(
key,
ReportStatus.failure,
......@@ -74,43 +77,52 @@ class KafkaTopology(private val settings: SettingsLoader) {
val hasLocatorBranch = model[0]
.mapValues { value -> extractSubjects(value!!) }
.mapValues { readOnlyKey, value ->
enrichSftpLocator(
addThumbnailSftpLocatorToModel(
readOnlyKey,
Triple(value.first, value.second, Report("", status = "", message = "")),
Constant.thumbnailFolderName
Triple(value.first, value.second, Report("", status = "", message = ""))
)
}
.branch(
Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
Predicate { _, _ -> true }
Predicate { _, value -> value.third.status == "FAILURE" },
Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
)
val updateDigitalObjects = hasLocatorBranch[0]
.mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value, Constant.mediaFolderName) }
hasLocatorBranch[0]
.mapValues { _, value ->
log.warn("Record contains faulty data: ${value.third.message}")
value.third.toJson()
}
.to(reportingTopic)
updateDigitalObjects
.mapValues { value ->
val out = StringWriter()
value.first.first.write(out, Constant.rdfParserLang)
out.toString().trim()
// TODO: To be discussed: Should message with these failures eventually be forwarded?
hasLocatorBranch[0]
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
val updateDigitalObjects = hasLocatorBranch[1]
.mapValues { readOnlyKey, value ->
val enrichedModel = addMediaSftpLocatorToModel(readOnlyKey, value)
if (enrichedModel.third.status == "FAILURE") {
log.warn("A problem enriching the digital object occurred: ${enrichedModel.third.message}")
}
enrichedModel
}
updateDigitalObjects
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
// report
updateDigitalObjects
.mapValues { value -> value.third.toJson() }
.to(reportingTopic)
hasLocatorBranch[1]
hasLocatorBranch[2]
.mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
.mapValues { value ->
val out = StringWriter()
value.first.first.write(out, Constant.rdfParserLang)
out.toString().trim()
}
.mapValues { value -> serializeModel(value.first.first) }
.to(settings.outputTopic)
hasLocatorBranch[1]
hasLocatorBranch[2]
.mapValues { _, value -> value.third.toJson() }
.to(reportingTopic)
......@@ -119,31 +131,66 @@ class KafkaTopology(private val settings: SettingsLoader) {
private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
if (noThumbnailAttached(value.second)) {
val record = value.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
val digitalObject = value.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
if (record != null && digitalObject != null) {
val locator = digitalObject.getProperty(EBUCORE.locator).string
val recordResource = getRecordResource(value.second)
val digitalObjectResource = getDigitalObjectResource(value.second)
if (recordResource != null && digitalObjectResource != null) {
val locator = digitalObjectResource.getProperty(EBUCORE.locator).string
when {
isNoValidUrl(locator) -> {
log.warn("No valid locator url found for ${value.third.id}")
val amendedReport = Report(
value.third.id,
ReportStatus.failure,
value.third.message + "; no valid locator url"
)
return value.copy(third = amendedReport)
}
PreviewImageHandler.isVimeoUrl(locator) -> {
log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
this.previewImageHandler.getFromVimeo(locator, Constant.vimeoThumbnailWidth)
?: return value.copy(
third = Report(
value.third.id,
ReportStatus.failure,
value.third.message + "; couldn't fetch vimeo thumbnail"
)
)
}
PreviewImageHandler.isYoutubeUrl(locator) -> {
log.info("Trying to download thumbnail file on youtube for ${value.third.id}")
this.previewImageHandler.getFromYoutube(locator)
?: return value.copy(
third = Report(
value.third.id,
ReportStatus.failure,
value.third.message + "; couldn't fetch youtube thumbnail"
)
)
}
else -> {
null
log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
val amendedReport = Report(
value.third.id,
ReportStatus.success,
value.third.message + "; no additional youtube / vimeo thumbnails fetched"
)
return value.copy(third = amendedReport)
}
}?.let {
val pathOnSftpServer = previewImageHandler.moveFileToSFTP(
it,
"$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
record.uri.split(
"/"
).last()
}.jpg"
)
}.let {
val destPath = "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
recordResource.uri.split(
"/"
).last()
}.jpg"
val pathOnSftpServer = previewImageHandler.moveFileToSFTP(it, destPath)
if (pathOnSftpServer != null) {
createThumbnailResource(value.first.first, record, digitalObject, pathOnSftpServer)
log.info("Move downloaded thumbnail file to $destPath for ${value.third.id}")
createThumbnailResource(
value.first.first,
recordResource,
digitalObjectResource,
pathOnSftpServer
)
val amendedReport = Report(
value.third.id,
value.third.status,
......@@ -151,6 +198,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
)
return value.copy(third = amendedReport)
} else {
log.warn("Couldn't move downloaded thumbnail file to $destPath for ${value.third.id}")
val amendedReport = Report(
value.third.id,
ReportStatus.failure,
......@@ -161,12 +209,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
}
}
val amendedReport = Report(
value.third.id,
ReportStatus.success,
value.third.message + "; no additional youtube / vimeo thumbnails fetched"
)
return value.copy(third = amendedReport)
return value
}
private fun noThumbnailAttached(resources: List<Resource>): Boolean {
......@@ -177,18 +220,18 @@ class KafkaTopology(private val settings: SettingsLoader) {
return Pair(input, input.first.listSubjects().toList())
}
private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
private fun hasDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
}
private fun getOriginalIdentifiers(record: Resource): List<Resource> {
private fun getOriginalIdentifier(record: Resource): String? {
return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
.filter { resource ->
.firstOrNull { resource ->
resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
RICO.type,
Constant.identifierType
)
}
}?.getProperty(RICO.identifier)?.string
}
private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
......@@ -220,78 +263,118 @@ class KafkaTopology(private val settings: SettingsLoader) {
thumbnail.addProperty(RICO.instantiates, record)
}
private fun enrichSftpLocator(
private fun addThumbnailSftpLocatorToModel(
key: String,
data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
return addSftpLocatorToModel(key, data, Constant.thumbnailFolderName)
}
private fun addMediaSftpLocatorToModel(
key: String,
data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
return addSftpLocatorToModel(key, data, Constant.mediaFolderName)
}
private fun addSftpLocatorToModel(
key: String,
data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
type: String
): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
var link: String
return data.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }.let { record ->
if (record != null) {
data.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }.let { digitalObject ->
if (digitalObject != null) {
val originalIdentifier = try {
getOriginalIdentifiers(record)[0]
} catch (ex: IndexOutOfBoundsException) {
return Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
)
}
val value = originalIdentifier.getProperty(RICO.identifier).string
for (extension in fileExtensions) {
val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
if (sftpClient.exists(filePath)) {
link = "${Constant.sftpPathPrefix}$filePath"
if (type == Constant.mediaFolderName) {
val literal = ResourceFactory.createPlainLiteral(link)
digitalObject.addLiteral(EBUCORE.locator, literal)
data.first.first.createLiteral(digitalObject.toString(), true)
} else if (type == Constant.thumbnailFolderName) {
createThumbnailResource(data.first.first, record, digitalObject, link)
}
return Triple(
data.first,
data.second,
Report(
record.uri,
ReportStatus.success,
"${data.third.message}\n${
ReportMessages.reportSuccess(
digitalObject.uri,
link,
type
)
}".trim()
)
)
}
}
// No file found in indicated directory
Triple(
data.first,
data.second,
Report(
record.uri,
if (type == "thumbnails") ReportStatus.success else ReportStatus.failure,
"${data.third.message}\n${ReportMessages.reportFailure(digitalObject.uri, type)}".trim()
)
)
} else // digital object is null!
{
Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, "No digital object present in model.")
)
}
}
} else // record is null
{
Triple(data.first, data.second, Report(key, ReportStatus.failure, "No record present in model."))
val recordResource = getRecordResource(data.second)
?: return Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, "No record present in model.")
)
val digitalObjectResource = getDigitalObjectResource(data.second)
?: return Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, "No digital object present in model.")
)
val originalIdentifierValue = getOriginalIdentifier(recordResource)
?: return Triple(
data.first,
data.second,
Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
)
val link = getLinkToResourceOnSFTPServer(data.first.second.recordSetId, type, originalIdentifierValue)
?: return Triple(
data.first,
data.second,
Report(
recordResource.uri,
if (type == "thumbnails") ReportStatus.success else ReportStatus.failure,
"${data.third.message}\n${ReportMessages.reportFailure(digitalObjectResource.uri, type)}".trim()
)
)
if (type == Constant.mediaFolderName) {
addLocatorToDigitalObjectResource(data.first.first, link, digitalObjectResource)
} else if (type == Constant.thumbnailFolderName) {
createThumbnailResource(data.first.first, recordResource, digitalObjectResource, link)
}
return Triple(
data.first,
data.second,
Report(
recordResource.uri,
ReportStatus.success,
"${data.third.message}\n${
ReportMessages.reportSuccess(
digitalObjectResource.uri,
link,
type
)
}".trim()
)
)
}
private fun getLinkToResourceOnSFTPServer(
recordSetId: String,
type: String,
originalIdentifierValue: String
): String? {
for (extension in fileExtensions) {
val filePath = "$sftpBasePath/$recordSetId/$type/$originalIdentifierValue.$extension"
if (sftpClient.exists(filePath)) {
return "${Constant.sftpPathPrefix}$filePath"
}
}
return null
}
private fun getRecordResource(resources: List<Resource>): Resource? {
return resources.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
}
private fun getDigitalObjectResource(resources: List<Resource>): Resource? {
return resources.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
}
private fun isNoValidUrl(locator: String): Boolean {
return try {
URL(locator)
false
} catch (ex: MalformedURLException) {
true
}
}
private fun addLocatorToDigitalObjectResource(model: Model, sftpLink: String, digitalObjectResource: Resource) {
val literal = ResourceFactory.createPlainLiteral(sftpLink)
digitalObjectResource.addLiteral(EBUCORE.locator, literal)
model.createLiteral(digitalObjectResource.toString(), true)
}
private fun serializeModel(model: Model): String {
val out = StringWriter()
model.write(out, Constant.rdfParserLang)
return out.toString().trim()
}
}
}
\ No newline at end of file
package org.memobase
import org.apache.logging.log4j.LogManager
import org.memobase.exceptions.SftpClientException
import org.memobase.sftp.SftpClient
import java.io.FileNotFoundException
import java.io.FileOutputStream
import java.io.IOException
import java.net.HttpURLConnection
import java.net.MalformedURLException
import java.net.URL
import java.nio.file.Files
import java.nio.file.Paths
import org.apache.logging.log4j.LogManager
import org.memobase.exceptions.SftpClientException
import org.memobase.sftp.SftpClient
/**
* Fetches preview images for videos on Vimeo or Youtube
......@@ -27,11 +26,7 @@ class PreviewImageHandler(private val sftpClient: SftpClient) {
* @return true if URL points to Vimeo
*/
fun isVimeoUrl(url: String): Boolean {
return try {
URL(url).host.toLowerCase() == "vimeo.com"
} catch (ex: MalformedURLException) {
false
}
return URL(url).host.toLowerCase() == "vimeo.com"
}
/**
......@@ -42,17 +37,13 @@ class PreviewImageHandler(private val sftpClient: SftpClient) {
* @return true if URL points to Youtube
*/
fun isYoutubeUrl(url: String): Boolean {
return try {
listOf("youtube.com", "youtu.be").contains(URL(url).host.toLowerCase())
} catch (ex: MalformedURLException) {
false
}
return listOf("youtube.com", "youtu.be").contains(URL(url).host.toLowerCase())
}
}
private fun get(urlAsString: String): String? {
val url = URL(urlAsString)
return try {
val url = URL(urlAsString)
val tempFile = Files.createTempFile("", ".jpg")
val outputStream = FileOutputStream(tempFile.toFile())
with(url.openConnection() as HttpURLConnection) {
......@@ -69,9 +60,6 @@ class PreviewImageHandler(private val sftpClient: SftpClient) {
} catch (ex: FileNotFoundException) {
log.error("Can't find temporary file: ${ex.message}")
null
} catch (ex: MalformedURLException) {
log.warn("No valid URL!")
null
}
}
......@@ -83,18 +71,13 @@ class PreviewImageHandler(private val sftpClient: SftpClient) {
* @return Path to local file
*/
fun getFromYoutube(videoURL: String): String? {
val id = try {
val url = URL(videoURL)
if (url.host.endsWith("youtube.com")) {
URL(videoURL).query.split("&").firstOrNull {
it.startsWith("v=")
}?.substring(2)
} else {
url.path.substring(1)
}
} catch (ex: MalformedURLException) {
log.warn("No valid URL!")
null
val url = URL(videoURL)
val id = if (url.host.endsWith("youtube.com")) {
URL(videoURL).query.split("&").firstOrNull {
it.startsWith("v=")
}?.substring(2)
} else {
url.path.substring(1)
}
return if (id != null) {
get("https://img.youtube.com/vi/$id/hqdefault.jpg")
......@@ -112,17 +95,8 @@ class PreviewImageHandler(private val sftpClient: SftpClient) {
* @return Path to local file
*/
fun getFromVimeo(videoURL: String, width: Int): String? {
val id = try {
URL(videoURL).path.split("/").last()
} catch (ex: MalformedURLException) {
log.warn("No valid URL!")
null
}
return if (id != null) {
get("https://i.vimeocdn.com/vimeo/${id}_$width.jpg")
} else {
null
}
val id = URL(videoURL).path.split("/").last()
return get("https://i.vimeocdn.com/vimeo/${id}_$width.jpg")
}
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment