KafkaTopology.kt 20.5 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
24
import org.apache.jena.rdf.model.ResourceFactory
25
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
26
import org.apache.kafka.streams.StreamsBuilder
27
import org.apache.kafka.streams.kstream.KStream
Jonas Waeber's avatar
Jonas Waeber committed
28
import org.apache.kafka.streams.kstream.Predicate
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.memobase.rdf.EBUCORE
31
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
32
33
34
35
36
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
37
38
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
39
40
41
42
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
Jonas Waeber's avatar
Jonas Waeber committed
43
44

class KafkaTopology(private val settings: SettingsLoader) {
45
    private val appSettings = settings.appSettings
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
46
    private val log = LogManager.getLogger("KafkaTopology")
47

Jonas Waeber's avatar
Jonas Waeber committed
48
    private val sftpClient = SftpClient(settings.sftpSettings)
49
    private val previewImageHandler = RemoteResourceHandler(sftpClient)
50
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
51
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
52
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
53

54
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
55
56
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
57
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
58

59
        val model = stream
60
            .transformValues(HeaderExtractionTransformSupplier<String>())
61
            .mapValues { value -> createModel(value) }
62
63
64
65
66
67
68
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
69
                log.warn("Invalid input data. Check mapper service processing.")
70
71
                Report(
                    key,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
                    ReportStatus.fatal,
73
                    generalFailureMessage = "Invalid input data. Check mapper service processing."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
74
                ).toJson()
75
76
77
            }
            .to(reportingTopic)

78
        val requiredFieldsAvailable = model[0]
79
            .mapValues { value -> extractSubjects(value!!) }
80
81
82
83
84
            .mapValues { key, value ->
                if (getDigitalObjectResource(value.second) == null) {
                    createRecord(
                        value,
                        key,
85
                        ReportStatus.ignore,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
86
                        generalFailureMessage = "No digital object resource present in model."
87
88
89
90
91
92
93
                    )
                } else {
                    createRecord(value, key, ReportStatus.success)
                }
            }
            .mapValues { key, value ->
                val recordResource = getRecordResource(value.second)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
                when {
                    recordResource == null -> {
                        updateRecord(
                            value,
                            ReportStatus.fatal,
                            generalMessage = "No record resource present in model."
                        )
                    }
                    getOriginalIdentifier(recordResource) == null -> {
                        updateRecord(
                            value,
                            ReportStatus.fatal,
                            generalMessage = ReportMessages.noOriginalIdentifier(key)
                        )
                    }
                    else -> {
                        value
                    }
112
                }
Matthias's avatar
Matthias committed
113
            }
114
            .branch(
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
115
                Predicate { _, value -> value.third.status == ReportStatus.fatal },
116
                Predicate { _, value -> value.third.status == ReportStatus.ignore },
117
                Predicate { _, _ -> true }
118
            )
Jonas Waeber's avatar
Jonas Waeber committed
119

120
        requiredFieldsAvailable[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
121
122
123
124
125
126
127
            .mapValues { _, value ->
                log.warn("Record contains faulty data: ${value.third.digitalObjectMessage}. Abort processing of message")
                value.third.toJson()
            }
            .to(reportingTopic)

        requiredFieldsAvailable[1]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
128
            .mapValues { _, value ->
129
                log.warn("Record contains faulty data: ${value.third.digitalObjectMessage}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
130
131
132
                value.third.toJson()
            }
            .to(reportingTopic)
133

134
        sendDownstream(requiredFieldsAvailable[1])
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
135

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
136
        val hasLocatorBranch = requiredFieldsAvailable[2]
137
138
139
140
141
142
143
144
            .mapValues { readOnlyKey, value ->
                addThumbnailSftpLocatorToModel(
                    readOnlyKey,
                    value
                )
            }
            .branch(
                Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
145
                Predicate { _, _ -> true } // Indicates a possible remote media file
146
147
148
            )

        val updateDigitalObjects = hasLocatorBranch[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
149
150
            .mapValues { readOnlyKey, value ->
                val enrichedModel = addMediaSftpLocatorToModel(readOnlyKey, value)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
151
                if (enrichedModel.third.status == ReportStatus.warning) {
152
                    log.warn("A problem enriching the digital object occurred: ${enrichedModel.third.digitalObjectMessage}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
153
154
                }
                enrichedModel
155
            }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
156

Jonas Waeber's avatar
Jonas Waeber committed
157

158
159
160
161
162
163
164
165
166
167
168
169
        val remoteThumbnails = hasLocatorBranch[1]
            .mapValues { value ->
                updateRecord(
                    value,
                    value.third.status,
                    digitalObjectMessage = "no local media files"
                )
            }
            .branch(
                Predicate { _, value -> value.third.status == "SUCCESS" }, // Local thumbnail linked; don't try to fetch others
                Predicate { _, _ -> true }  // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
            )
Jonas Waeber's avatar
Jonas Waeber committed
170

171
172
        sendDownstream(remoteThumbnails[0])
        report(remoteThumbnails[0])
Jonas Waeber's avatar
Jonas Waeber committed
173

174
175
176
177
178
179
180
181
182
        val remotelyEnrichedThumbnails =
            remoteThumbnails[1]
                .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }

        sendDownstream(updateDigitalObjects)
        report(updateDigitalObjects)

        sendDownstream(remotelyEnrichedThumbnails)
        report(remotelyEnrichedThumbnails)
Jonas Waeber's avatar
Jonas Waeber committed
183

184
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
185
186
    }

187
188
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
189
190
191
192
            val recordResource = getRecordResource(value.second)
            val digitalObjectResource = getDigitalObjectResource(value.second)
            if (recordResource != null && digitalObjectResource != null) {
                val locator = digitalObjectResource.getProperty(EBUCORE.locator).string
193
                when {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194
195
                    isNoValidUrl(locator) -> {
                        log.warn("No valid locator url found for ${value.third.id}")
196
                        return updateRecord(value, ReportStatus.warning, digitalObjectMessage = "no valid locator url")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
197
                    }
198
                    RemoteResourceHandler.isVimeoUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
199
                        log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
200
201
202
203
                        val thumbnailHandler = this.previewImageHandler.getFromVimeo(locator)
                        if (thumbnailHandler == null) {
                            log.warn("Download for ${value.third.id} failed!")
                            return updateRecord(
204
                                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
205
                                ReportStatus.warning,
206
                                thumbnailMessage = "couldn't fetch vimeo thumbnail"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
207
                            )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
208
209
                        }
                        thumbnailHandler
210
                    }
211
                    RemoteResourceHandler.isYoutubeUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
212
                        log.info("Trying to download thumbnail file on youtube for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
213
214
215
216
                        val thumbnailHandler = this.previewImageHandler.getFromYoutube(locator)
                        if (thumbnailHandler == null) {
                            log.warn("Download for ${value.third.id} failed!")
                            return updateRecord(
217
                                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
218
219
                                ReportStatus.warning,
                                thumbnailMessage = "couldn't fetch youtube thumbnail"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
220
                            )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
221
222
                        }
                        thumbnailHandler
223
224
                    }
                    else -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
225
                        log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
226
227
                        return updateRecord(
                            value,
228
229
                            ReportStatus.ignore,
                            thumbnailMessage = "no thumbnails available"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
230
                        )
231
                    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
232
233
234
235
236
237
238
                }.let { h ->
                    val enrichedValue = addDimensionsToDigitalObject(value, h.first)
                    val filePath = h.second
                    return if (filePath != null) {
                        addLocalThumbnail(enrichedValue, recordResource, digitalObjectResource, filePath)
                    } else {
                        log.warn("No thumbnail url available for ${value.third.id}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
239
240
241
242
243
                        updateRecord(
                            enrichedValue,
                            ReportStatus.warning,
                            thumbnailMessage = "Download of youtube / vimeo thumbnail failed. Check if resource is still available."
                        )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
244
                    }
245
246
247
                }
            }
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
248
        return value
249
250
    }

251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
    private fun addLocalThumbnail(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        recordResource: Resource,
        digitalObjectResource: Resource,
        pathToLocalFile: String
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val destPath = "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
            recordResource.uri.split(
                "/"
            ).last()
        }.jpg"
        val pathOnSftpServer = previewImageHandler.moveFileToSFTP(pathToLocalFile, destPath)
        if (pathOnSftpServer != null) {
            log.info("Move downloaded thumbnail file to $destPath for ${value.third.id}")
            createThumbnailResource(
                value.first.first,
                recordResource,
                digitalObjectResource,
269
                "${Constant.sftpPathPrefix}$pathOnSftpServer"
270
271
272
            )
            return updateRecord(
                value,
273
                ReportStatus.success,
274
275
276
277
278
279
                thumbnailMessage = "youtube / vimeo thumbnail fetched"
            )
        } else {
            log.warn("Couldn't move downloaded thumbnail file to $destPath for ${value.third.id}")
            return updateRecord(
                value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
280
                ReportStatus.warning,
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
                thumbnailMessage = "upload of youtube / vimeo thumbnail to sFTP server failed"
            )
        }
    }

    private fun addDimensionsToDigitalObject(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        oembedObject: OembedResponse
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val digitalObjectResource = getDigitalObjectResource(value.second)!!
        if (oembedObject.width != null) {
            val width = ResourceFactory.createPlainLiteral(oembedObject.width.toString())
            digitalObjectResource.addLiteral(EBUCORE.width, width)
        }
        if (oembedObject.height != null) {
            val height = ResourceFactory.createPlainLiteral(oembedObject.height.toString())
            digitalObjectResource.addLiteral(EBUCORE.height, height)
        }
        value.first.first.createLiteral(digitalObjectResource.toString(), true)
        return value
    }

303
304
305
306
    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

307
308
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
309
310
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
311
    private fun hasDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
312
313
314
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
315
    private fun getOriginalIdentifier(record: Resource): String? {
316
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
317
            .firstOrNull { resource ->
318
319
320
321
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
322
            }?.getProperty(RICO.identifier)?.string
Jonas Waeber's avatar
Jonas Waeber committed
323
324
    }

325
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
326
        val model = ModelFactory.createDefaultModel()
327
328
329
330
331
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
332
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
333
334
    }

335
336
337
338
339
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
340
341
342
    ): String {
        val uri = "${digitalObject.uri}/derived"
        val thumbnail = data.createResource(uri)
343
344
345
346
347
348
349
350
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
351
        return uri
352
353
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
    private fun addThumbnailSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.thumbnailFolderName)
    }

    private fun addMediaSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.mediaFolderName)
    }

    private fun addSftpLocatorToModel(
Matthias's avatar
Matthias committed
369
370
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
371
        type: String
Matthias's avatar
Matthias committed
372
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
373
374
375
        val recordResource = getRecordResource(data.second)!!
        val digitalObjectResource = getDigitalObjectResource(data.second)!!
        val originalIdentifierValue = getOriginalIdentifier(recordResource)!!
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
376
        val link = getLinkToResourceOnSFTPServer(data.first.second.recordSetId, type, originalIdentifierValue)
377
            ?: return if (type == Constant.thumbnailFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
378
                updateRecord(data, ReportStatus.ignore, thumbnailMessage = "no local thumbnails available")
379
            } else {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
380
                updateRecord(data, ReportStatus.warning, digitalObjectMessage = ReportMessages.reportFailure(key, type))
381
382
            }
        return if (type == Constant.mediaFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
383
            addLocatorToDigitalObjectResource(data.first.first, link, digitalObjectResource)
384
385
            updateRecord(
                data,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
386
                ReportStatus.success,
387
                digitalObjectMessage = ReportMessages.reportSuccess(digitalObjectResource.uri, link, type)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
388
            )
389
390
391
392
393
        } else {
            val uri = createThumbnailResource(data.first.first, recordResource, digitalObjectResource, link)
            updateRecord(
                data,
                ReportStatus.success,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
394
                thumbnailMessage = ReportMessages.reportSuccess(uri, link, type)
395
396
            )
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
397
398
399
400
401
402
403
404
405
406
407
    }

    private fun getLinkToResourceOnSFTPServer(
        recordSetId: String,
        type: String,
        originalIdentifierValue: String
    ): String? {
        for (extension in fileExtensions) {
            val filePath = "$sftpBasePath/$recordSetId/$type/$originalIdentifierValue.$extension"
            if (sftpClient.exists(filePath)) {
                return "${Constant.sftpPathPrefix}$filePath"
Jonas Waeber's avatar
Jonas Waeber committed
408
            }
Jonas Waeber's avatar
Jonas Waeber committed
409
        }
410
        log.info("No media file found. Tried $sftpBasePath/$recordSetId/$type/$originalIdentifierValue.{${fileExtensions.joinToString()}}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
        return null
    }

    private fun getRecordResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
    }

    private fun getDigitalObjectResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
    }

    private fun isNoValidUrl(locator: String): Boolean {
        return try {
            URL(locator)
            false
        } catch (ex: MalformedURLException) {
            true
        }
    }

    private fun addLocatorToDigitalObjectResource(model: Model, sftpLink: String, digitalObjectResource: Resource) {
        val literal = ResourceFactory.createPlainLiteral(sftpLink)
        digitalObjectResource.addLiteral(EBUCORE.locator, literal)
        model.createLiteral(digitalObjectResource.toString(), true)
    }

    private fun serializeModel(model: Model): String {
        val out = StringWriter()
        model.write(out, Constant.rdfParserLang)
        return out.toString().trim()
Jonas Waeber's avatar
Jonas Waeber committed
441
    }
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462

    private fun updateRecord(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        status: String,
        generalMessage: String = "",
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val amendedReport = value.third.copy(
            status = status,
            generalFailureMessage = if (generalMessage != "") generalMessage else value.third.generalFailureMessage,
            digitalObjectMessage = if (digitalObjectMessage != "") digitalObjectMessage else value.third.digitalObjectMessage,
            thumbnailMessage = if (thumbnailMessage != "") thumbnailMessage else value.third.thumbnailMessage
        )
        return value.copy(third = amendedReport)
    }

    private fun createRecord(
        value: Pair<Pair<Model, HeaderMetadata>, List<Resource>>,
        messageId: String,
        status: String,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
463
        generalFailureMessage: String = "",
464
465
466
467
468
469
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val report = Report(
            messageId,
            status,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
470
            generalFailureMessage = generalFailureMessage,
471
472
473
474
475
            digitalObjectMessage = digitalObjectMessage,
            thumbnailMessage = thumbnailMessage
        )
        return updateRecord(Triple(value.first, value.second, report), status = status)
    }
476
477
478
479
480
481
482
483
484
485
486
487

    private fun report(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
        kstream
            .mapValues { _, value -> value.third.toJson() }
            .to(reportingTopic)
    }

    private fun sendDownstream(kstream: KStream<String, Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>>) {
        kstream
            .mapValues { value -> serializeModel(value.first.first) }
            .to(settings.outputTopic)
    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
488
}