KafkaTopology.kt 15.9 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
24
import org.apache.jena.rdf.model.ResourceFactory
25
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
26
27
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
28
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
29
import org.memobase.rdf.EBUCORE
30
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
31
32
33
34
35
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
36
37
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
38
39
40
41
import java.io.StringReader
import java.io.StringWriter
import java.net.MalformedURLException
import java.net.URL
Jonas Waeber's avatar
Jonas Waeber committed
42
43

class KafkaTopology(private val settings: SettingsLoader) {
44
    private val appSettings = settings.appSettings
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
45
    private val log = LogManager.getLogger("KafkaTopology")
46

Jonas Waeber's avatar
Jonas Waeber committed
47
    private val sftpClient = SftpClient(settings.sftpSettings)
48
    private val previewImageHandler = PreviewImageHandler(sftpClient)
49
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
50
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
51
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
52

53
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
54
55
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
56
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
57

58
        val model = stream
59
            .transformValues(HeaderExtractionTransformSupplier<String>())
60
            .mapValues { value -> createModel(value) }
61
62
63
64
65
66
67
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
68
                log.warn("Invalid input data. Check mapper service processing.")
69
70
71
                Report(
                    key,
                    ReportStatus.failure,
72
                    "Invalid input data. Check mapper service processing."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
73
                ).toJson()
74
75
76
            }
            .to(reportingTopic)

77
        val hasLocatorBranch = model[0]
78
            .mapValues { value -> extractSubjects(value!!) }
79
            .mapValues { readOnlyKey, value ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
80
                addThumbnailSftpLocatorToModel(
81
                    readOnlyKey,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
82
                    Triple(value.first, value.second, Report("", status = "", message = ""))
Matthias's avatar
Matthias committed
83
84
                )
            }
85
            .branch(
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
86
87
88
                Predicate { _, value -> value.third.status == "FAILURE" },
                Predicate { _, value -> hasDigitalObjectWithoutLocator(value.second) }, // Indicates a local media file
                Predicate { _, _ -> true } // Indicates a remote media file; check for youtube / vimeo thumbnail fetching
89
            )
Jonas Waeber's avatar
Jonas Waeber committed
90

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
91
92
93
94
95
96
        hasLocatorBranch[0]
            .mapValues { _, value ->
                log.warn("Record contains faulty data: ${value.third.message}")
                value.third.toJson()
            }
            .to(reportingTopic)
97

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
98
99
100
101
102
103
104
105
106
107
108
109
        // TODO: To be discussed: Should message with these failures eventually be forwarded?
        hasLocatorBranch[0]
            .mapValues { value -> serializeModel(value.first.first) }
            .to(settings.outputTopic)

        val updateDigitalObjects = hasLocatorBranch[1]
            .mapValues { readOnlyKey, value ->
                val enrichedModel = addMediaSftpLocatorToModel(readOnlyKey, value)
                if (enrichedModel.third.status == "FAILURE") {
                    log.warn("A problem enriching the digital object occurred: ${enrichedModel.third.message}")
                }
                enrichedModel
110
            }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
111
112
113

        updateDigitalObjects
            .mapValues { value -> serializeModel(value.first.first) }
114
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
115
116

        updateDigitalObjects
117
            .mapValues { value -> value.third.toJson() }
118
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
119

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
120
        hasLocatorBranch[2]
121
            .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
122
            .mapValues { value -> serializeModel(value.first.first) }
123
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
124

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
125
        hasLocatorBranch[2]
126
            .mapValues { _, value -> value.third.toJson() }
127
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
128

129
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
130
131
    }

132
133
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
134
135
136
137
            val recordResource = getRecordResource(value.second)
            val digitalObjectResource = getDigitalObjectResource(value.second)
            if (recordResource != null && digitalObjectResource != null) {
                val locator = digitalObjectResource.getProperty(EBUCORE.locator).string
138
                when {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
139
140
141
142
143
144
145
146
147
                    isNoValidUrl(locator) -> {
                        log.warn("No valid locator url found for ${value.third.id}")
                        val amendedReport = Report(
                            value.third.id,
                            ReportStatus.failure,
                            value.third.message + "; no valid locator url"
                        )
                        return value.copy(third = amendedReport)
                    }
148
                    PreviewImageHandler.isVimeoUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
149
                        log.info("Trying to download thumbnail file on vimeo for ${value.third.id}")
150
                        this.previewImageHandler.getFromVimeo(locator, Constant.vimeoThumbnailWidth)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
151
152
153
154
155
156
157
                            ?: return value.copy(
                                third = Report(
                                    value.third.id,
                                    ReportStatus.failure,
                                    value.third.message + "; couldn't fetch vimeo thumbnail"
                                )
                            )
158
159
                    }
                    PreviewImageHandler.isYoutubeUrl(locator) -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
160
                        log.info("Trying to download thumbnail file on youtube for ${value.third.id}")
161
                        this.previewImageHandler.getFromYoutube(locator)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
162
163
164
165
166
167
168
                            ?: return value.copy(
                                third = Report(
                                    value.third.id,
                                    ReportStatus.failure,
                                    value.third.message + "; couldn't fetch youtube thumbnail"
                                )
                            )
169
170
                    }
                    else -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
171
172
173
174
175
176
177
                        log.debug("Won't fetch thumbnail file for ${value.third.id} because no youtube/vimeo resource")
                        val amendedReport = Report(
                            value.third.id,
                            ReportStatus.success,
                            value.third.message + "; no additional youtube / vimeo thumbnails fetched"
                        )
                        return value.copy(third = amendedReport)
178
                    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
179
180
181
182
183
184
185
                }.let {
                    val destPath = "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
                        recordResource.uri.split(
                            "/"
                        ).last()
                    }.jpg"
                    val pathOnSftpServer = previewImageHandler.moveFileToSFTP(it, destPath)
186
                    if (pathOnSftpServer != null) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
187
188
189
190
191
192
193
                        log.info("Move downloaded thumbnail file to $destPath for ${value.third.id}")
                        createThumbnailResource(
                            value.first.first,
                            recordResource,
                            digitalObjectResource,
                            pathOnSftpServer
                        )
194
195
196
197
198
199
200
                        val amendedReport = Report(
                            value.third.id,
                            value.third.status,
                            value.third.message + "; youtube / vimeo thumbnail fetched"
                        )
                        return value.copy(third = amendedReport)
                    } else {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
201
                        log.warn("Couldn't move downloaded thumbnail file to $destPath for ${value.third.id}")
202
203
204
205
206
207
208
209
210
211
                        val amendedReport = Report(
                            value.third.id,
                            ReportStatus.failure,
                            value.third.message + "; youtube / vimeo thumbnail couldn't be uploaded to Sftp server"
                        )
                        return value.copy(third = amendedReport)
                    }
                }
            }
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
212
        return value
213
214
215
216
217
218
    }

    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

219
220
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
221
222
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
223
    private fun hasDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
224
225
226
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
227
    private fun getOriginalIdentifier(record: Resource): String? {
228
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
229
            .firstOrNull { resource ->
230
231
232
233
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
234
            }?.getProperty(RICO.identifier)?.string
Jonas Waeber's avatar
Jonas Waeber committed
235
236
    }

237
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
238
        val model = ModelFactory.createDefaultModel()
239
240
241
242
243
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
244
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
245
246
    }

247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
    ) {
        val thumbnail = data.createResource(
            "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
        )
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
    }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
    private fun addThumbnailSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.thumbnailFolderName)
    }

    private fun addMediaSftpLocatorToModel(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        return addSftpLocatorToModel(key, data, Constant.mediaFolderName)
    }

    private fun addSftpLocatorToModel(
Matthias's avatar
Matthias committed
281
282
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
283
        type: String
Matthias's avatar
Matthias committed
284
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
        val recordResource = getRecordResource(data.second)
            ?: return Triple(
                data.first,
                data.second,
                Report(key, ReportStatus.failure, "No record present in model.")
            )

        val digitalObjectResource = getDigitalObjectResource(data.second)
            ?: return Triple(
                data.first,
                data.second,
                Report(key, ReportStatus.failure, "No digital object present in model.")
            )
        val originalIdentifierValue = getOriginalIdentifier(recordResource)
            ?: return Triple(
                data.first,
                data.second,
                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
            )

        val link = getLinkToResourceOnSFTPServer(data.first.second.recordSetId, type, originalIdentifierValue)
            ?: return Triple(
                data.first,
                data.second,
                Report(
                    recordResource.uri,
                    if (type == "thumbnails") ReportStatus.success else ReportStatus.failure,
                    "${data.third.message}\n${ReportMessages.reportFailure(digitalObjectResource.uri, type)}".trim()
                )
            )

        if (type == Constant.mediaFolderName) {
            addLocatorToDigitalObjectResource(data.first.first, link, digitalObjectResource)
        } else if (type == Constant.thumbnailFolderName) {
            createThumbnailResource(data.first.first, recordResource, digitalObjectResource, link)
        }
        return Triple(
            data.first,
            data.second,
            Report(
                recordResource.uri,
                ReportStatus.success,
                "${data.third.message}\n${
                    ReportMessages.reportSuccess(
                        digitalObjectResource.uri,
                        link,
                        type
                    )
                }".trim()
            )
        )
    }

    private fun getLinkToResourceOnSFTPServer(
        recordSetId: String,
        type: String,
        originalIdentifierValue: String
    ): String? {
        for (extension in fileExtensions) {
            val filePath = "$sftpBasePath/$recordSetId/$type/$originalIdentifierValue.$extension"
            if (sftpClient.exists(filePath)) {
                return "${Constant.sftpPathPrefix}$filePath"
Jonas Waeber's avatar
Jonas Waeber committed
347
            }
Jonas Waeber's avatar
Jonas Waeber committed
348
        }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
        return null
    }

    private fun getRecordResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
    }

    private fun getDigitalObjectResource(resources: List<Resource>): Resource? {
        return resources.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
    }

    private fun isNoValidUrl(locator: String): Boolean {
        return try {
            URL(locator)
            false
        } catch (ex: MalformedURLException) {
            true
        }
    }

    private fun addLocatorToDigitalObjectResource(model: Model, sftpLink: String, digitalObjectResource: Resource) {
        val literal = ResourceFactory.createPlainLiteral(sftpLink)
        digitalObjectResource.addLiteral(EBUCORE.locator, literal)
        model.createLiteral(digitalObjectResource.toString(), true)
    }

    private fun serializeModel(model: Model): String {
        val out = StringWriter()
        model.write(out, Constant.rdfParserLang)
        return out.toString().trim()
Jonas Waeber's avatar
Jonas Waeber committed
379
    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
380
}