KafkaTopology.kt 21.8 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22
23
24
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
28
import org.apache.jena.rdf.model.ResourceFactory
29
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.apache.kafka.streams.StreamsBuilder
31
import org.apache.kafka.streams.kstream.KStream
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.kafka.streams.kstream.Predicate
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
33
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.memobase.rdf.EBUCORE
35
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
36
37
38
39
40
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
41
42
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
43
44

class KafkaTopology(private val settings: SettingsLoader) {
45
    private val appSettings = settings.appSettings
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
46
    private val log = LogManager.getLogger("KafkaTopology")
47

Jonas Waeber's avatar
Jonas Waeber committed
48
    private val sftpClient = SftpClient(settings.sftpSettings)
49
    private val previewImageHandler = RemoteResourceHandler(sftpClient)
50
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
51
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
52
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
53

54
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
55
56
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
57
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
58

59
        val model = stream
60
            .transformValues(HeaderExtractionTransformSupplier<String>())
61
            .mapValues { value -> createModel(value) }
62
63
64
65
66
67
68
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
69
                log.warn("Invalid input data. Check mapper service processing.")
70
71
                Report(
                    key,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
                    ReportStatus.fatal,
73
                    generalFailureMessage = "Invalid input data. Check mapper service processing."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
74
                ).toJson()
75
76
77
            }
            .to(reportingTopic)

78
        val requiredFieldsAvailable = model[0]
79
            .mapValues { value -> extractSubjects(value!!) }
80
81
82
83
84
            .mapValues { key, value ->
                if (getDigitalObjectResource(value.second) == null) {
                    createRecord(
                        value,
                        key,
85
                        ReportStatus.ignore,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
86
                        digitalObjectMessage = "No digital object resource present in model."
87
88
89
90
91
92
93
                    )
                } else {
                    createRecord(value, key, ReportStatus.success)
                }
            }
            .mapValues { key, value ->
                val recordResource = getRecordResource(value.second)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
                when {
                    recordResource == null -> {
                        updateRecord(
                            value,
                            ReportStatus.fatal,
                            generalMessage = "No record resource present in model."
                        )
                    }
                    getOriginalIdentifier(recordResource) == null -> {
                        updateRecord(
                            value,
                            ReportStatus.fatal,
                            generalMessage = ReportMessages.noOriginalIdentifier(key)
                        )
                    }
                    else -> {
                        value
                    }
112
                }
Matthias's avatar
Matthias committed
113
            }
114
            .branch(
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
115
                Predicate { _, value -> value.third.status == ReportStatus.fatal },
116
                Predicate { _, value -> value.third.status == ReportStatus.ignore },
117
                Predicate { _, _ -> true }
118
            )
Jonas Waeber's avatar
Jonas Waeber committed
119

120
        requiredFieldsAvailable[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
121
122
123
124
125
126
127
            .mapValues { _, value ->
                log.warn("Record contains faulty data: ${value.third.digitalObjectMessage}. Abort processing of message")
                value.third.toJson()
            }
            .to(reportingTopic)

        requiredFieldsAvailable[1]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
128
            .mapValues { _, value ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
129
                log.warn("Record has no digital object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
130
131
132
                value.third.toJson()
            }
            .to(reportingTopic)
133

134
        sendDownstream(requiredFieldsAvailable[1])
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
135

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
136
        val hasLocatorBranch = requiredFieldsAvailable[2]
137
138
139
140
141
142
143
144
            .mapValues { readOnlyKey, value ->
                addThumbnailSftpLocatorToModel(
                    readOnlyKey,
                    value
                )
            }
            .branch(
                Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
145
                Predicate { _, value -> isFaroResource(value.second) },
146
                Predicate { _, _ -> true } // Indicates a possible remote media file
147
148
149
            )

        val updateDigitalObjects = hasLocatorBranch[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
150
151
            .mapValues { readOnlyKey, value ->
                val enrichedModel = addMediaSftpLocatorToModel(readOnlyKey, value)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
152
                if (enrichedModel.third.status == ReportStatus.warning) {
153
                    log.warn("A problem enriching the digital object occurred: ${enrichedModel.third.digitalObjectMessage}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
154
155
                }
                enrichedModel
156
            }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
157

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
158
159
160
161
162
163
164
165
        val faroResource = hasLocatorBranch[1]
            .mapValues { value ->
                updateRecord(
                    value,
                    value.third.status,
                    digitalObjectMessage = "faro resources don't have accessible media files"
                )
            }
Jonas Waeber's avatar
Jonas Waeber committed
166

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
167
        val remoteThumbnails = hasLocatorBranch[2]
168
169
170
171
172
173
174
175
176
            .mapValues { value ->
                updateRecord(
                    value,
                    value.third.status,
                    digitalObjectMessage = "no local media files"
                )
            }
            .branch(
                Predicate { _, value -> value.third.status == "SUCCESS" }, // Local thumbnail linked; don't try to fetch others
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
177
                Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
178
            )
Jonas Waeber's avatar
Jonas Waeber committed
179

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
180
181
182
        sendDownstream(faroResource)
        report(faroResource)

183
184
        sendDownstream(remoteThumbnails[0])
        report(remoteThumbnails[0])
Jonas Waeber's avatar
Jonas Waeber committed
185

186
187
188
189
190
191
192
193
194
        val remotelyEnrichedThumbnails =
            remoteThumbnails[1]
                .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }

        sendDownstream(updateDigitalObjects)
        report(updateDigitalObjects)

        sendDownstream(remotelyEnrichedThumbnails)
        report(remotelyEnrichedThumbnails)
Jonas Waeber's avatar
Jonas Waeber committed
195

196
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
197
198
    }

199
200
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
201
202
203
204
            val recordResource = getRecordResource(value.second)
            val digitalObjectResource = getDigitalObjectResource(value.second)
            if (recordResource != null && digitalObjectResource != null) {
                val locator = digitalObjectResource.getProperty(EBUCORE.locator).string
205
                when {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
206
207
                    isNoValidUrl(locator) -> {
                        log.warn("No valid locator url found for ${value.third.id}")
208
                        return updateRecord(value, ReportStatus.warning, digitalObjectMessage = "no valid locator url")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
209
                    }
210
                    RemoteResourceHandler.isVimeoUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
211
                        log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
212
213
214
215
                        val thumbnailHandler = this.previewImageHandler.getFromVimeo(locator)
                        if (thumbnailHandler == null) {
                            log.warn("Download for ${value.third.id} failed!")
                            return updateRecord(
216
                                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
217
                                ReportStatus.warning,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
218
                                digitalObjectMessage = "couldn't download oembed metadata from youtube"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
219
                            )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
220
221
                        }
                        thumbnailHandler
222
                    }
223
                    RemoteResourceHandler.isYoutubeUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
224
                        log.info("Trying to download thumbnail file on youtube for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
225
226
227
228
                        val thumbnailHandler = this.previewImageHandler.getFromYoutube(locator)
                        if (thumbnailHandler == null) {
                            log.warn("Download for ${value.third.id} failed!")
                            return updateRecord(
229
                                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
230
                                ReportStatus.warning,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
231
                                digitalObjectMessage = "couldn't download oembed metadata from vimeo"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
232
                            )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
233
234
                        }
                        thumbnailHandler
235
236
                    }
                    else -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
237
                        log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
238
239
                        return updateRecord(
                            value,
240
241
                            ReportStatus.ignore,
                            thumbnailMessage = "no thumbnails available"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
242
                        )
243
                    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
244
245
246
247
248
249
250
                }.let { h ->
                    val enrichedValue = addDimensionsToDigitalObject(value, h.first)
                    val filePath = h.second
                    return if (filePath != null) {
                        addLocalThumbnail(enrichedValue, recordResource, digitalObjectResource, filePath)
                    } else {
                        log.warn("No thumbnail url available for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
251
252
253
                        updateRecord(
                            enrichedValue,
                            ReportStatus.warning,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
254
                            thumbnailMessage = "Download of youtube / vimeo thumbnail failed or no thumbnail available."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
255
                        )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
256
                    }
257
258
259
                }
            }
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
260
        return value
261
262
    }

263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
    private fun addLocalThumbnail(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        recordResource: Resource,
        digitalObjectResource: Resource,
        pathToLocalFile: String
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val destPath = "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
            recordResource.uri.split(
                "/"
            ).last()
        }.jpg"
        val pathOnSftpServer = previewImageHandler.moveFileToSFTP(pathToLocalFile, destPath)
        if (pathOnSftpServer != null) {
            log.info("Move downloaded thumbnail file to $destPath for ${value.third.id}")
            createThumbnailResource(
                value.first.first,
                recordResource,
                digitalObjectResource,
281
                "${Constant.sftpPathPrefix}$pathOnSftpServer"
282
283
284
            )
            return updateRecord(
                value,
285
                ReportStatus.success,
286
287
288
289
290
291
                thumbnailMessage = "youtube / vimeo thumbnail fetched"
            )
        } else {
            log.warn("Couldn't move downloaded thumbnail file to $destPath for ${value.third.id}")
            return updateRecord(
                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
292
                ReportStatus.warning,
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
                thumbnailMessage = "upload of youtube / vimeo thumbnail to sFTP server failed"
            )
        }
    }

    private fun addDimensionsToDigitalObject(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        oembedObject: OembedResponse
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val digitalObjectResource = getDigitalObjectResource(value.second)!!
        if (oembedObject.width != null) {
            val width = ResourceFactory.createPlainLiteral(oembedObject.width.toString())
            digitalObjectResource.addLiteral(EBUCORE.width, width)
        }
        if (oembedObject.height != null) {
            val height = ResourceFactory.createPlainLiteral(oembedObject.height.toString())
            digitalObjectResource.addLiteral(EBUCORE.height, height)
        }
        value.first.first.createLiteral(digitalObjectResource.toString(), true)
        return value
    }

315
316
317
318
    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

319
320
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
321
322
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
323
    private fun hasDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
        return res.any {
            it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) &&
                    it.hasProperty(RICO.regulatedBy) && !isFaroResource(res)
        }
    }

    private fun isFaroResource(res: List<Resource>): Boolean {
        return res.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) && it.hasProperty(RICO.regulatedBy) }
            ?.let {
                for (property in it.listProperties(RICO.regulatedBy)) {
                    val regulatedBy = property.`object`.asResource()
                    if (regulatedBy.hasProperty(RICO.type) && regulatedBy.getProperty(RICO.type).literal.string == "access" &&
                        regulatedBy.hasProperty(RICO.name)
                    ) {
                        return regulatedBy.getProperty(RICO.name).literal.string == "faro"
                    }
                }
                false
            } ?: false
343
344
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
345
    private fun getOriginalIdentifier(record: Resource): String? {
346
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
347
            .firstOrNull { resource ->
348
349
350
351
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
352
            }?.getProperty(RICO.identifier)?.string
Jonas Waeber's avatar
Jonas Waeber committed
353
354
    }

355
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
356
        val model = ModelFactory.createDefaultModel()
357
358
359
360
361
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
362
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
363
364
    }

365
366
367
368
369
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
370
371
372
    ): String {
        val uri = "${digitalObject.uri}/derived"
        val thumbnail = data.createResource(uri)
373
374
375
376
377
378
379
380
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
381
        return uri
382
383
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
    private fun addThumbnailSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.thumbnailFolderName)
    }

    private fun addMediaSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.mediaFolderName)
    }

    private fun addSftpLocatorToModel(
Matthias's avatar
Matthias committed
399
400
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
401
        type: String
Matthias's avatar
Matthias committed
402
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
403
404
405
        val recordResource = getRecordResource(data.second)!!
        val digitalObjectResource = getDigitalObjectResource(data.second)!!
        val originalIdentifierValue = getOriginalIdentifier(recordResource)!!
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
406
        val link = getLinkToResourceOnSFTPServer(data.first.second.recordSetId, type, originalIdentifierValue)
407
            ?: return if (type == Constant.thumbnailFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
408
                updateRecord(data, ReportStatus.ignore, thumbnailMessage = "no local thumbnails available")
409
            } else {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
410
                updateRecord(data, ReportStatus.warning, digitalObjectMessage = ReportMessages.reportFailure(key, type))
411
412
            }
        return if (type == Constant.mediaFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
413
            addLocatorToDigitalObjectResource(data.first.first, link, digitalObjectResource)
414
415
            updateRecord(
                data,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
416
                ReportStatus.success,
417
                digitalObjectMessage = ReportMessages.reportSuccess(digitalObjectResource.uri, link, type)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
418
            )
419
420
421
422
423
        } else {
            val uri = createThumbnailResource(data.first.first, recordResource, digitalObjectResource, link)
            updateRecord(
                data,
                ReportStatus.success,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
424
                thumbnailMessage = ReportMessages.reportSuccess(uri, link, type)
425
426
            )
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
427
428
429
430
431
432
433
434
435
436
437
    }

    private fun getLinkToResourceOnSFTPServer(
        recordSetId: String,
        type: String,
        originalIdentifierValue: String
    ): String? {
        for (extension in fileExtensions) {
            val filePath = "$sftpBasePath/$recordSetId/$type/$originalIdentifierValue.$extension"
            if (sftpClient.exists(filePath)) {
                return "${Constant.sftpPathPrefix}$filePath"
Jonas Waeber's avatar
Jonas Waeber committed
438
            }
Jonas Waeber's avatar
Jonas Waeber committed
439
        }
440
        log.info("No media file found. Tried $sftpBasePath/$recordSetId/$type/$originalIdentifierValue.{${fileExtensions.joinToString()}}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
        return null
    }

    private fun getRecordResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
    }

    private fun getDigitalObjectResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
    }

    private fun isNoValidUrl(locator: String): Boolean {
        return try {
            URL(locator)
            false
        } catch (ex: MalformedURLException) {
            true
        }
    }

    private fun addLocatorToDigitalObjectResource(model: Model, sftpLink: String, digitalObjectResource: Resource) {
        val literal = ResourceFactory.createPlainLiteral(sftpLink)
        digitalObjectResource.addLiteral(EBUCORE.locator, literal)
        model.createLiteral(digitalObjectResource.toString(), true)
    }

    private fun serializeModel(model: Model): String {
        val out = StringWriter()
        model.write(out, Constant.rdfParserLang)
        return out.toString().trim()
Jonas Waeber's avatar
Jonas Waeber committed
471
    }
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492

    private fun updateRecord(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        status: String,
        generalMessage: String = "",
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val amendedReport = value.third.copy(
            status = status,
            generalFailureMessage = if (generalMessage != "") generalMessage else value.third.generalFailureMessage,
            digitalObjectMessage = if (digitalObjectMessage != "") digitalObjectMessage else value.third.digitalObjectMessage,
            thumbnailMessage = if (thumbnailMessage != "") thumbnailMessage else value.third.thumbnailMessage
        )
        return value.copy(third = amendedReport)
    }

    private fun createRecord(
        value: Pair<Pair<Model, HeaderMetadata>, List<Resource>>,
        messageId: String,
        status: String,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
493
        generalFailureMessage: String = "",
494
495
496
497
498
499
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val report = Report(
            messageId,
            status,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
500
            generalFailureMessage = generalFailureMessage,
501
502
503
504
505
            digitalObjectMessage = digitalObjectMessage,
            thumbnailMessage = thumbnailMessage
        )
        return updateRecord(Triple(value.first, value.second, report), status = status)
    }
506
507
508
509
510
511
512
513
514
515
516
517

    private fun report(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
        kstream
            .mapValues { _, value -> value.third.toJson() }
            .to(reportingTopic)
    }

    private fun sendDownstream(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
        kstream
            .mapValues { value -> serializeModel(value.first.first) }
            .to(settings.outputTopic)
    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
518
}