KafkaTopology.kt 17.1 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22
23
24
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
28
import org.apache.jena.rdf.model.ResourceFactory
29
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
30
31
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
32
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.memobase.rdf.EBUCORE
34
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
35
36
37
38
39
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
40
41
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
42
43

class KafkaTopology(private val settings: SettingsLoader) {
44
    private val appSettings = settings.appSettings
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
45
    private val log = LogManager.getLogger("KafkaTopology")
46

Jonas Waeber's avatar
Jonas Waeber committed
47
    private val sftpClient = SftpClient(settings.sftpSettings)
48
    private val previewImageHandler = PreviewImageHandler(sftpClient)
49
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
50
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
51
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
52

53
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
54
55
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
56
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
57

58
        val model = stream
59
            .transformValues(HeaderExtractionTransformSupplier<String>())
60
            .mapValues { value -> createModel(value) }
61
62
63
64
65
66
67
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
68
                log.warn("Invalid input data. Check mapper service processing.")
69
70
71
                Report(
                    key,
                    ReportStatus.failure,
72
                    generalFailureMessage = "Invalid input data. Check mapper service processing."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
73
                ).toJson()
74
75
76
            }
            .to(reportingTopic)

77
        val requiredFieldsAvailable = model[0]
78
            .mapValues { value -> extractSubjects(value!!) }
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
            .mapValues { key, value ->
                if (getDigitalObjectResource(value.second) == null) {
                    createRecord(
                        value,
                        key,
                        ReportStatus.failure,
                        generalMessage = "No digital object resource present in model."
                    )
                } else {
                    createRecord(value, key, ReportStatus.success)
                }
            }
            .mapValues { key, value ->
                val recordResource = getRecordResource(value.second)
                if (recordResource == null) {
                    updateRecord(value, ReportStatus.failure, generalMessage = "No record resource present in model.")
                } else if (getOriginalIdentifier(recordResource) == null) {
                    updateRecord(value, ReportStatus.failure, generalMessage = ReportMessages.noOriginalIdentifier(key))
                } else {
                    value
                }
Matthias's avatar
Matthias committed
100
            }
101
            .branch(
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
102
                Predicate { _, value -> value.third.status == "FAILURE" },
103
                Predicate { _, _ -> true }
104
            )
Jonas Waeber's avatar
Jonas Waeber committed
105

106
        requiredFieldsAvailable[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
107
            .mapValues { _, value ->
108
                log.warn("Record contains faulty data: ${value.third.digitalObjectMessage}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
109
110
111
                value.third.toJson()
            }
            .to(reportingTopic)
112

113
114
        // TODO: To be discussed: Should messages with these failures eventually be forwarded?
        requiredFieldsAvailable[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
115
116
117
            .mapValues { value -> serializeModel(value.first.first) }
            .to(settings.outputTopic)

118
119
120
121
122
123
124
125
126
127
128
129
130
        val hasLocatorBranch = requiredFieldsAvailable[1]
            .mapValues { readOnlyKey, value ->
                addThumbnailSftpLocatorToModel(
                    readOnlyKey,
                    value
                )
            }
            .branch(
                Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
                Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
            )

        val updateDigitalObjects = hasLocatorBranch[0]
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
131
132
133
            .mapValues { readOnlyKey, value ->
                val enrichedModel = addMediaSftpLocatorToModel(readOnlyKey, value)
                if (enrichedModel.third.status == "FAILURE") {
134
                    log.warn("A problem enriching the digital object occurred: ${enrichedModel.third.digitalObjectMessage}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
135
136
                }
                enrichedModel
137
            }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
138
139
140

        updateDigitalObjects
            .mapValues { value -> serializeModel(value.first.first) }
141
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
142
143

        updateDigitalObjects
144
            .mapValues { value -> value.third.toJson() }
145
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
146

147
        hasLocatorBranch[1]
148
            .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
149
            .mapValues { value -> serializeModel(value.first.first) }
150
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
151

152
        hasLocatorBranch[1]
153
            .mapValues { _, value -> value.third.toJson() }
154
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
155

156
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
157
158
    }

159
160
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
161
162
163
164
            val recordResource = getRecordResource(value.second)
            val digitalObjectResource = getDigitalObjectResource(value.second)
            if (recordResource != null && digitalObjectResource != null) {
                val locator = digitalObjectResource.getProperty(EBUCORE.locator).string
165
                when {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
166
167
                    isNoValidUrl(locator) -> {
                        log.warn("No valid locator url found for ${value.third.id}")
168
                        return updateRecord(value, ReportStatus.failure, thumbnailMessage = "no valid locator url")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
169
                    }
170
                    PreviewImageHandler.isVimeoUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
171
                        log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
172
                        this.previewImageHandler.getFromVimeo(locator, Constant.vimeoThumbnailWidth)
173
174
175
176
                            ?: return updateRecord(
                                value,
                                ReportStatus.failure,
                                thumbnailMessage = "couldn't fetch vimeo thumbnail"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
177
                            )
178
179
                    }
                    PreviewImageHandler.isYoutubeUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
180
                        log.info("Trying to download thumbnail file on youtube for ${value.third.id}")
181
                        this.previewImageHandler.getFromYoutube(locator)
182
183
184
185
                            ?: return updateRecord(
                                value,
                                ReportStatus.failure,
                                thumbnailMessage = "couldn't fetch youtube thumbail"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
186
                            )
187
188
                    }
                    else -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
189
                        log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
190
191
                        return updateRecord(
                            value,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
192
                            ReportStatus.success,
193
                            thumbnailMessage = "no additional thumbnails fetched"
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194
                        )
195
                    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
196
197
198
199
200
201
202
                }.let {
                    val destPath = "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
                        recordResource.uri.split(
                            "/"
                        ).last()
                    }.jpg"
                    val pathOnSftpServer = previewImageHandler.moveFileToSFTP(it, destPath)
203
                    if (pathOnSftpServer != null) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
204
205
206
207
208
209
210
                        log.info("Move downloaded thumbnail file to $destPath for ${value.third.id}")
                        createThumbnailResource(
                            value.first.first,
                            recordResource,
                            digitalObjectResource,
                            pathOnSftpServer
                        )
211
212
                        return updateRecord(
                            value,
213
                            value.third.status,
214
                            thumbnailMessage = "youtube / vimeo thumbnail fetched"
215
216
                        )
                    } else {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
217
                        log.warn("Couldn't move downloaded thumbnail file to $destPath for ${value.third.id}")
218
219
                        return updateRecord(
                            value,
220
                            ReportStatus.failure,
221
                            thumbnailMessage = "upload of youtube / vimeo thumbnail to sFTP server failed"
222
223
224
225
226
                        )
                    }
                }
            }
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
227
        return value
228
229
230
231
232
233
    }

    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

234
235
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
236
237
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
238
    private fun hasDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
239
240
241
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
242
    private fun getOriginalIdentifier(record: Resource): String? {
243
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
244
            .firstOrNull { resource ->
245
246
247
248
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
249
            }?.getProperty(RICO.identifier)?.string
Jonas Waeber's avatar
Jonas Waeber committed
250
251
    }

252
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
253
        val model = ModelFactory.createDefaultModel()
254
255
256
257
258
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
259
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
260
261
    }

262
263
264
265
266
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
267
268
269
    ): String {
        val uri = "${digitalObject.uri}/derived"
        val thumbnail = data.createResource(uri)
270
271
272
273
274
275
276
277
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
278
        return uri
279
280
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
    private fun addThumbnailSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.thumbnailFolderName)
    }

    private fun addMediaSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.mediaFolderName)
    }

    private fun addSftpLocatorToModel(
Matthias's avatar
Matthias committed
296
297
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
298
        type: String
Matthias's avatar
Matthias committed
299
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
300
301
302
        val recordResource = getRecordResource(data.second)!!
        val digitalObjectResource = getDigitalObjectResource(data.second)!!
        val originalIdentifierValue = getOriginalIdentifier(recordResource)!!
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
303
        val link = getLinkToResourceOnSFTPServer(data.first.second.recordSetId, type, originalIdentifierValue)
304
305
306
307
308
309
            ?: return if (type == Constant.thumbnailFolderName) {
                updateRecord(data, ReportStatus.success, thumbnailMessage = "no local thumbnails available")
            } else {
                updateRecord(data, ReportStatus.failure, digitalObjectMessage = ReportMessages.reportFailure(key, type))
            }
        return if (type == Constant.mediaFolderName) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
310
            addLocatorToDigitalObjectResource(data.first.first, link, digitalObjectResource)
311
312
            updateRecord(
                data,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
313
                ReportStatus.success,
314
                digitalObjectMessage = ReportMessages.reportSuccess(digitalObjectResource.uri, link, type)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
315
            )
316
317
318
319
320
321
322
323
        } else {
            val uri = createThumbnailResource(data.first.first, recordResource, digitalObjectResource, link)
            updateRecord(
                data,
                ReportStatus.success,
                digitalObjectMessage = ReportMessages.reportSuccess(uri, link, type)
            )
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
324
325
326
327
328
329
330
331
332
333
334
    }

    private fun getLinkToResourceOnSFTPServer(
        recordSetId: String,
        type: String,
        originalIdentifierValue: String
    ): String? {
        for (extension in fileExtensions) {
            val filePath = "$sftpBasePath/$recordSetId/$type/$originalIdentifierValue.$extension"
            if (sftpClient.exists(filePath)) {
                return "${Constant.sftpPathPrefix}$filePath"
Jonas Waeber's avatar
Jonas Waeber committed
335
            }
Jonas Waeber's avatar
Jonas Waeber committed
336
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
        return null
    }

    private fun getRecordResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
    }

    private fun getDigitalObjectResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
    }

    private fun isNoValidUrl(locator: String): Boolean {
        return try {
            URL(locator)
            false
        } catch (ex: MalformedURLException) {
            true
        }
    }

    private fun addLocatorToDigitalObjectResource(model: Model, sftpLink: String, digitalObjectResource: Resource) {
        val literal = ResourceFactory.createPlainLiteral(sftpLink)
        digitalObjectResource.addLiteral(EBUCORE.locator, literal)
        model.createLiteral(digitalObjectResource.toString(), true)
    }

    private fun serializeModel(model: Model): String {
        val out = StringWriter()
        model.write(out, Constant.rdfParserLang)
        return out.toString().trim()
Jonas Waeber's avatar
Jonas Waeber committed
367
    }
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401

    private fun updateRecord(
        value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        status: String,
        generalMessage: String = "",
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val amendedReport = value.third.copy(
            status = status,
            generalFailureMessage = if (generalMessage != "") generalMessage else value.third.generalFailureMessage,
            digitalObjectMessage = if (digitalObjectMessage != "") digitalObjectMessage else value.third.digitalObjectMessage,
            thumbnailMessage = if (thumbnailMessage != "") thumbnailMessage else value.third.thumbnailMessage
        )
        return value.copy(third = amendedReport)
    }

    private fun createRecord(
        value: Pair<Pair<Model, HeaderMetadata>, List<Resource>>,
        messageId: String,
        status: String,
        generalMessage: String = "",
        digitalObjectMessage: String = "",
        thumbnailMessage: String = ""
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        val report = Report(
            messageId,
            status,
            generalFailureMessage = generalMessage,
            digitalObjectMessage = digitalObjectMessage,
            thumbnailMessage = thumbnailMessage
        )
        return updateRecord(Triple(value.first, value.second, report), status = status)
    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
402
}