KafkaTopology.kt 22.7 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21 22 23 24 25 26 27
import ch.memobase.rdf.EBUCORE
import ch.memobase.rdf.RDF
import ch.memobase.rdf.RICO
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
28 29 30 31
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
Jonas Waeber's avatar
Jonas Waeber committed
32 33 34
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
35
import org.apache.jena.rdf.model.ResourceFactory
36
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
37
import org.apache.kafka.streams.StreamsBuilder
38
import org.apache.kafka.streams.kstream.KStream
Jonas Waeber's avatar
Jonas Waeber committed
39
import org.apache.kafka.streams.kstream.Predicate
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
41 42 43 44
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus

class KafkaTopology(private val settings: SettingsLoader) {
45
    private val appSettings = settings.appSettings
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
46
    private val log = LogManager.getLogger("KafkaTopology")
47

Jonas Waeber's avatar
Jonas Waeber committed
48
    private val sftpClient = SftpClient(settings.sftpSettings)
49
    private val previewImageHandler = RemoteResourceHandler(sftpClient)
50
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
51
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
52
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
53

54
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
55 56
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
57
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
58

59
        val model = stream
60
            .transformValues(HeaderExtractionTransformSupplier<String>())
61
            .mapValues { value -> createModel(value) }
62 63 64 65 66 67 68
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
69
                log.warn("Invalid input data. Check mapper service processing.")
70 71
                Report(
                    key,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
                    ReportStatus.fatal,
73
                    generalFailureMessage = "Invalid input data. Check mapper service processing."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
74
                ).toJson()
75 76 77
            }
            .to(reportingTopic)

78
        val requiredFieldsAvailable = model[0]
79
            .mapValues { value -> extractSubjects(value!!) }
80 81 82 83 84
            .mapValues { key, value ->
                if (getDigitalObjectResource(value.second) == null) {
                    createRecord(
                        value,
                        key,
85
                        ReportStatus.ignore,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
86
                        digitalObjectMessage = "No digital object resource present in model."
87 88 89 90 91 92 93
                    )
                } else {
                    createRecord(value, key, ReportStatus.success)
                }
            }
            .mapValues { key, value ->
                val recordResource = getRecordResource(value.second)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
                when {
                    recordResource == null -> {
                        updateRecord(
                            value,
                            ReportStatus.fatal,
                            generalMessage = "No record resource present in model."
                        )
                    }
                    getOriginalIdentifier(recordResource) == null -> {
                        updateRecord(
                            value,
                            ReportStatus.fatal,
                            generalMessage = ReportMessages.noOriginalIdentifier(key)
                        )
                    }
                    else -> {
                        value
                    }
112
                }
Matthias's avatar
Matthias committed
113
            }
114
            .branch(
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
115
                Predicate { _, value -> value.third.status == ReportStatus.fatal },
116
                Predicate { _, value -> value.third.status == ReportStatus.ignore },
117
                Predicate { _, _ -> true }
118
            )
Jonas Waeber's avatar
Jonas Waeber committed
119

120
        requiredFieldsAvailable[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
121 122 123 124 125 126 127
            .mapValues { _, value ->
                log.warn("Record contains faulty data: ${value.third.digitalObjectMessage}. Abort processing of message")
                value.third.toJson()
            }
            .to(reportingTopic)

        requiredFieldsAvailable[1]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
128
            .mapValues { _, value ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
129
                log.warn("Record has no digital object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
130 131 132
                value.third.toJson()
            }
            .to(reportingTopic)
133

134
        sendDownstream(requiredFieldsAvailable[1])
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
135

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
136
        val hasLocatorBranch = requiredFieldsAvailable[2]
137 138 139 140 141 142 143
            .mapValues { readOnlyKey, value ->
                addThumbnailSftpLocatorToModel(
                    readOnlyKey,
                    value
                )
            }
            .branch(
144 145
                Predicate { _, value -> hasAccessibleDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
                Predicate { _, value -> hasNoAccessProperty(value.second) },
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
146
                Predicate { _, value -> isFaroResource(value.second) },
147
                Predicate { _, _ -> true } // Indicates a possible remote media file
148 149 150
            )

        val updateDigitalObjects = hasLocatorBranch[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
151 152
            .mapValues { readOnlyKey, value ->
                val enrichedModel = addMediaSftpLocatorToModel(readOnlyKey, value)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
153
                if (enrichedModel.third.status == ReportStatus.warning) {
154
                    log.warn("A problem enriching the digital object occurred: ${enrichedModel.third.digitalObjectMessage}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
155 156
                }
                enrichedModel
157
            }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
158

159 160 161 162 163 164 165 166 167 168 169
        val resourceWithoutAccessProp = hasLocatorBranch[1]
            .mapValues { value ->
                log.info("A digital object without access flag encountered!")
                updateRecord(
                    value,
                    value.third.status,
                    digitalObjectMessage = "digital object without locator and access property"
                )
            }

        val faroResource = hasLocatorBranch[2]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
170 171 172 173 174 175 176
            .mapValues { value ->
                updateRecord(
                    value,
                    value.third.status,
                    digitalObjectMessage = "faro resources don't have accessible media files"
                )
            }
Jonas Waeber's avatar
Jonas Waeber committed
177

178
        val remoteThumbnails = hasLocatorBranch[3]
179 180 181 182 183 184 185 186 187
            .mapValues { value ->
                updateRecord(
                    value,
                    value.third.status,
                    digitalObjectMessage = "no local media files"
                )
            }
            .branch(
                Predicate { _, value -> value.third.status == "SUCCESS" }, // Local thumbnail linked; don't try to fetch others
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
188
                Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
189
            )
Jonas Waeber's avatar
Jonas Waeber committed
190

191 192 193
        sendDownstream(resourceWithoutAccessProp)
        report(resourceWithoutAccessProp)

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194 195 196
        sendDownstream(faroResource)
        report(faroResource)

197 198
        sendDownstream(remoteThumbnails[0])
        report(remoteThumbnails[0])
Jonas Waeber's avatar
Jonas Waeber committed
199

200 201 202 203 204 205 206 207 208
        val remotelyEnrichedThumbnails =
            remoteThumbnails[1]
                .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }

        sendDownstream(updateDigitalObjects)
        report(updateDigitalObjects)

        sendDownstream(remotelyEnrichedThumbnails)
        report(remotelyEnrichedThumbnails)
Jonas Waeber's avatar
Jonas Waeber committed
209

210
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
211 212
    }

213 214
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
215 216 217 218
            val recordResource = getRecordResource(value.second)
            val digitalObjectResource = getDigitalObjectResource(value.second)
            if (recordResource != null && digitalObjectResource != null) {
                val locator = digitalObjectResource.getProperty(EBUCORE.locator).string
219
                when {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
220 221
                    isNoValidUrl(locator) -> {
                        log.warn("No valid locator url found for ${value.third.id}")
222
                        return updateRecord(value, ReportStatus.warning, digitalObjectMessage = "no valid locator url")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
223
                    }
224
                    RemoteResourceHandler.isVimeoUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
225
                        log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
226 227 228 229
                        val thumbnailHandler = this.previewImageHandler.getFromVimeo(locator)
                        if (thumbnailHandler == null) {
                            log.warn("Download for ${value.third.id} failed!")
                            return updateRecord(
230
                                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
231
                                ReportStatus.warning,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
232
                                digitalObjectMessage = "couldn't download oembed metadata from youtube"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
233
                            )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
234 235
                        }
                        thumbnailHandler
236
                    }
237
                    RemoteResourceHandler.isYoutubeUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
238
                        log.info("Trying to download thumbnail file on youtube for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
239 240 241 242
                        val thumbnailHandler = this.previewImageHandler.getFromYoutube(locator)
                        if (thumbnailHandler == null) {
                            log.warn("Download for ${value.third.id} failed!")
                            return updateRecord(
243
                                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
244
                                ReportStatus.warning,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
245
                                digitalObjectMessage = "couldn't download oembed metadata from vimeo"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
246
                            )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
247 248
                        }
                        thumbnailHandler
249 250
                    }
                    else -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
251
                        log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
252 253
                        return updateRecord(
                            value,
254 255
                            ReportStatus.ignore,
                            thumbnailMessage = "no thumbnails available"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
256
                        )
257
                    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
258 259 260 261 262 263 264
                }.let { h ->
                    val enrichedValue = addDimensionsToDigitalObject(value, h.first)
                    val filePath = h.second
                    return if (filePath != null) {
                        addLocalThumbnail(enrichedValue, recordResource, digitalObjectResource, filePath)
                    } else {
                        log.warn("No thumbnail url available for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
265 266 267
                        updateRecord(
                            enrichedValue,
                            ReportStatus.warning,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
268
                            thumbnailMessage = "Download of youtube / vimeo thumbnail failed or no thumbnail available."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
269
                        )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
270
                    }
271 272 273
                }
            }
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
274
        return value
275 276
    }

277 278 279 280 281 282 283 284 285 286
    private fun addLocalThumbnail(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        recordResource: Resource,
        digitalObjectResource: Resource,
        pathToLocalFile: String
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val destPath = "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
            recordResource.uri.split(
                "/"
            ).last()
287
                .substring(7)
288 289 290 291 292 293 294 295
        }.jpg"
        val pathOnSftpServer = previewImageHandler.moveFileToSFTP(pathToLocalFile, destPath)
        if (pathOnSftpServer != null) {
            log.info("Move downloaded thumbnail file to $destPath for ${value.third.id}")
            createThumbnailResource(
                value.first.first,
                recordResource,
                digitalObjectResource,
296
                "${Constant.sftpPathPrefix}$pathOnSftpServer"
297 298 299
            )
            return updateRecord(
                value,
300
                ReportStatus.success,
301 302 303 304 305 306
                thumbnailMessage = "youtube / vimeo thumbnail fetched"
            )
        } else {
            log.warn("Couldn't move downloaded thumbnail file to $destPath for ${value.third.id}")
            return updateRecord(
                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
307
                ReportStatus.warning,
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
                thumbnailMessage = "upload of youtube / vimeo thumbnail to sFTP server failed"
            )
        }
    }

    private fun addDimensionsToDigitalObject(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        oembedObject: OembedResponse
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val digitalObjectResource = getDigitalObjectResource(value.second)!!
        if (oembedObject.width != null) {
            val width = ResourceFactory.createPlainLiteral(oembedObject.width.toString())
            digitalObjectResource.addLiteral(EBUCORE.width, width)
        }
        if (oembedObject.height != null) {
            val height = ResourceFactory.createPlainLiteral(oembedObject.height.toString())
            digitalObjectResource.addLiteral(EBUCORE.height, height)
        }
        value.first.first.createLiteral(digitalObjectResource.toString(), true)
        return value
    }

330 331 332 333
    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

334 335
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
336 337
    }

338
    private fun hasAccessibleDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
        return res.any {
            it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) &&
                    it.hasProperty(RICO.regulatedBy) && !isFaroResource(res)
        }
    }

    private fun isFaroResource(res: List<Resource>): Boolean {
        return res.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) && it.hasProperty(RICO.regulatedBy) }
            ?.let {
                for (property in it.listProperties(RICO.regulatedBy)) {
                    val regulatedBy = property.`object`.asResource()
                    if (regulatedBy.hasProperty(RICO.type) && regulatedBy.getProperty(RICO.type).literal.string == "access" &&
                        regulatedBy.hasProperty(RICO.name)
                    ) {
                        return regulatedBy.getProperty(RICO.name).literal.string == "faro"
                    }
                }
                false
            } ?: false
358 359
    }

360 361 362 363 364 365 366 367
    private fun hasNoAccessProperty(res: List<Resource>): Boolean {
        return res.any {
            it.hasProperty(RICO.type, Constant.digitalObject) &&
                    !it.hasProperty(EBUCORE.locator) &&
                    !it.hasProperty(RICO.regulatedBy)
        }
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
368
    private fun getOriginalIdentifier(record: Resource): String? {
369
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
370
            .firstOrNull { resource ->
371 372 373 374
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
375
            }?.getProperty(RICO.identifier)?.string
Jonas Waeber's avatar
Jonas Waeber committed
376 377
    }

378
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
379
        val model = ModelFactory.createDefaultModel()
380 381 382 383 384
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
385
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
386 387
    }

388 389 390 391 392
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
393 394 395
    ): String {
        val uri = "${digitalObject.uri}/derived"
        val thumbnail = data.createResource(uri)
396 397 398 399 400 401 402 403
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
404
        return uri
405 406
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
    private fun addThumbnailSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.thumbnailFolderName)
    }

    private fun addMediaSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.mediaFolderName)
    }

    private fun addSftpLocatorToModel(
Matthias's avatar
Matthias committed
422 423
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
424
        type: String
Matthias's avatar
Matthias committed
425
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
426 427 428
        val recordResource = getRecordResource(data.second)!!
        val digitalObjectResource = getDigitalObjectResource(data.second)!!
        val originalIdentifierValue = getOriginalIdentifier(recordResource)!!
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
429
        val link = getLinkToResourceOnSFTPServer(data.first.second.recordSetId, type, originalIdentifierValue)
430
            ?: return if (type == Constant.thumbnailFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
431
                updateRecord(data, ReportStatus.ignore, thumbnailMessage = "no local thumbnails available")
432
            } else {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
433
                updateRecord(data, ReportStatus.warning, digitalObjectMessage = ReportMessages.reportFailure(key, type))
434 435
            }
        return if (type == Constant.mediaFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
436
            addLocatorToDigitalObjectResource(data.first.first, link, digitalObjectResource)
437 438
            updateRecord(
                data,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
439
                ReportStatus.success,
440
                digitalObjectMessage = ReportMessages.reportSuccess(digitalObjectResource.uri, link, type)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
441
            )
442 443 444 445 446
        } else {
            val uri = createThumbnailResource(data.first.first, recordResource, digitalObjectResource, link)
            updateRecord(
                data,
                ReportStatus.success,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
447
                thumbnailMessage = ReportMessages.reportSuccess(uri, link, type)
448 449
            )
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
450 451 452 453 454 455 456 457
    }

    private fun getLinkToResourceOnSFTPServer(
        recordSetId: String,
        type: String,
        originalIdentifierValue: String
    ): String? {
        for (extension in fileExtensions) {
458
            val filePath = "$sftpBasePath/$recordSetId/$type/$originalIdentifierValue.$extension"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
459 460
            if (sftpClient.exists(filePath)) {
                return "${Constant.sftpPathPrefix}$filePath"
Jonas Waeber's avatar
Jonas Waeber committed
461
            }
Jonas Waeber's avatar
Jonas Waeber committed
462
        }
463
        log.info("No media file found. Tried $sftpBasePath/$recordSetId/$type/$originalIdentifierValue.{${fileExtensions.joinToString()}}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
        return null
    }

    private fun getRecordResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
    }

    private fun getDigitalObjectResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
    }

    private fun isNoValidUrl(locator: String): Boolean {
        return try {
            URL(locator)
            false
        } catch (ex: MalformedURLException) {
            true
        }
    }

    private fun addLocatorToDigitalObjectResource(model: Model, sftpLink: String, digitalObjectResource: Resource) {
        val literal = ResourceFactory.createPlainLiteral(sftpLink)
        digitalObjectResource.addLiteral(EBUCORE.locator, literal)
        model.createLiteral(digitalObjectResource.toString(), true)
    }

    private fun serializeModel(model: Model): String {
        val out = StringWriter()
        model.write(out, Constant.rdfParserLang)
        return out.toString().trim()
Jonas Waeber's avatar
Jonas Waeber committed
494
    }
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515

    private fun updateRecord(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        status: String,
        generalMessage: String = "",
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val amendedReport = value.third.copy(
            status = status,
            generalFailureMessage = if (generalMessage != "") generalMessage else value.third.generalFailureMessage,
            digitalObjectMessage = if (digitalObjectMessage != "") digitalObjectMessage else value.third.digitalObjectMessage,
            thumbnailMessage = if (thumbnailMessage != "") thumbnailMessage else value.third.thumbnailMessage
        )
        return value.copy(third = amendedReport)
    }

    private fun createRecord(
        value: Pair<Pair<Model, HeaderMetadata>, List<Resource>>,
        messageId: String,
        status: String,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
516
        generalFailureMessage: String = "",
517 518 519 520 521 522
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val report = Report(
            messageId,
            status,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
523
            generalFailureMessage = generalFailureMessage,
524 525 526 527 528
            digitalObjectMessage = digitalObjectMessage,
            thumbnailMessage = thumbnailMessage
        )
        return updateRecord(Triple(value.first, value.second, report), status = status)
    }
529 530 531 532 533 534 535 536 537 538 539 540

    private fun report(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
        kstream
            .mapValues { _, value -> value.third.toJson() }
            .to(reportingTopic)
    }

    private fun sendDownstream(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
        kstream
            .mapValues { value -> serializeModel(value.first.first) }
            .to(settings.outputTopic)
    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
541
}