KafkaTopology.kt 12.9 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
import java.io.StringReader
import java.io.StringWriter
Jonas Waeber's avatar
Jonas Waeber committed
23
24
25
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
26
import org.apache.jena.rdf.model.ResourceFactory
27
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
28
29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.memobase.rdf.EBUCORE
32
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
33
34
35
36
37
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
38
39
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
40
41

class KafkaTopology(private val settings: SettingsLoader) {
42
    private val appSettings = settings.appSettings
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
43
    private val log = LogManager.getLogger("KafkaTopology")
44

Jonas Waeber's avatar
Jonas Waeber committed
45
    private val sftpClient = SftpClient(settings.sftpSettings)
46
    private val previewImageHandler = PreviewImageHandler(sftpClient)
47
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
48
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
49
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
50

51
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
52
53
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
54
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
55

56
        val model = stream
57
            .transformValues(HeaderExtractionTransformSupplier<String>())
58
            .mapValues { value -> createModel(value) }
59
60
61
62
63
64
65
66
67
68
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
                Report(
                    key,
                    ReportStatus.failure,
69
                    "Invalid input data. Check mapper service processing."
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
70
                ).toJson()
71
72
73
            }
            .to(reportingTopic)

74
        val hasLocatorBranch = model[0]
75
            .mapValues { value -> extractSubjects(value!!) }
76
            .mapValues { readOnlyKey, value ->
Matthias's avatar
Matthias committed
77
                enrichSftpLocator(
78
79
80
                    readOnlyKey,
                    Triple(value.first, value.second, Report("", status = "", message = "")),
                    Constant.thumbnailFolderName
Matthias's avatar
Matthias committed
81
82
                )
            }
83
84
85
86
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
87

88
        val updateDigitalObjects = hasLocatorBranch[0]
89
90
91
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value, Constant.mediaFolderName) }

        updateDigitalObjects
92
93
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
94
                value.first.first.write(out, Constant.rdfParserLang)
95
96
97
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
98

99
        // report
Jonas Waeber's avatar
Jonas Waeber committed
100
        updateDigitalObjects
101
            .mapValues { value -> value.third.toJson() }
102
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
103

104
        hasLocatorBranch[1]
105
            .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
106
107
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
108
                value.first.first.write(out, Constant.rdfParserLang)
109
110
111
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
112

113
        hasLocatorBranch[1]
114
            .mapValues { _, value -> value.third.toJson() }
115
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
116

117
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
118
119
    }

120
121
122
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
            val record = value.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
Jonas Waeber's avatar
Jonas Waeber committed
123
            val digitalObject = value.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
            if (record != null && digitalObject != null) {
                val locator = digitalObject.getProperty(EBUCORE.locator).string
                when {
                    PreviewImageHandler.isVimeoUrl(locator) -> {
                        this.previewImageHandler.getFromVimeo(locator, Constant.vimeoThumbnailWidth)
                    }
                    PreviewImageHandler.isYoutubeUrl(locator) -> {
                        this.previewImageHandler.getFromYoutube(locator)
                    }
                    else -> {
                        null
                    }
                }?.let {
                    val pathOnSftpServer = previewImageHandler.moveFileToSFTP(
                        it,
139
140
141
142
143
                        "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/${
                            record.uri.split(
                                "/"
                            ).last()
                        }.jpg"
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
                    )
                    if (pathOnSftpServer != null) {
                        createThumbnailResource(value.first.first, record, digitalObject, pathOnSftpServer)
                        val amendedReport = Report(
                            value.third.id,
                            value.third.status,
                            value.third.message + "; youtube / vimeo thumbnail fetched"
                        )
                        return value.copy(third = amendedReport)
                    } else {
                        val amendedReport = Report(
                            value.third.id,
                            ReportStatus.failure,
                            value.third.message + "; youtube / vimeo thumbnail couldn't be uploaded to Sftp server"
                        )
                        return value.copy(third = amendedReport)
                    }
                }
            }
        }
        val amendedReport = Report(
            value.third.id,
            ReportStatus.success,
            value.third.message + "; no additional youtube / vimeo thumbnails fetched"
        )
        return value.copy(third = amendedReport)
    }

    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

176
177
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
178
179
    }

180
181
182
183
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

184
185
186
187
188
189
190
191
    private fun getOriginalIdentifiers(record: Resource): List<Resource> {
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
            .filter { resource ->
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
            }
Jonas Waeber's avatar
Jonas Waeber committed
192
193
    }

194
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
195
        val model = ModelFactory.createDefaultModel()
196
197
198
199
200
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
201
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
202
203
    }

204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
    ) {
        val thumbnail = data.createResource(
            "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
        )
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
    }

Matthias's avatar
Matthias committed
223
224
225
    private fun enrichSftpLocator(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
226
        type: String
Matthias's avatar
Matthias committed
227
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
228
        var link: String
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
        return data.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }.let { record ->
            if (record != null) {
                data.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }.let { digitalObject ->
                    if (digitalObject != null) {
                        val originalIdentifier = try {
                            getOriginalIdentifiers(record)[0]
                        } catch (ex: IndexOutOfBoundsException) {
                            return Triple(
                                data.first,
                                data.second,
                                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
                            )
                        }
                        val value = originalIdentifier.getProperty(RICO.identifier).string

                        for (extension in fileExtensions) {
                            val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
                            if (sftpClient.exists(filePath)) {
                                link = "${Constant.sftpPathPrefix}$filePath"
                                if (type == Constant.mediaFolderName) {
                                    val literal = ResourceFactory.createPlainLiteral(link)
                                    digitalObject.addLiteral(EBUCORE.locator, literal)
                                    data.first.first.createLiteral(digitalObject.toString(), true)
                                } else if (type == Constant.thumbnailFolderName) {
253
                                    createThumbnailResource(data.first.first, record, digitalObject, link)
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
                                }
                                return Triple(
                                    data.first,
                                    data.second,
                                    Report(
                                        record.uri,
                                        ReportStatus.success,
                                        "${data.third.message}\n${
                                            ReportMessages.reportSuccess(
                                                digitalObject.uri,
                                                link,
                                                type
                                            )
                                        }".trim()
                                    )
                                )
                            }
                        }
272
                        // No file found in indicated directory
273
274
275
276
277
                        Triple(
                            data.first,
                            data.second,
                            Report(
                                record.uri,
278
                                if (type == "thumbnails") ReportStatus.success else ReportStatus.failure,
279
280
281
282
283
284
285
286
287
288
289
                                "${data.third.message}\n${ReportMessages.reportFailure(digitalObject.uri, type)}".trim()
                            )
                        )
                    } else // digital object is null!
                    {
                        Triple(
                            data.first,
                            data.second,
                            Report(key, ReportStatus.failure, "No digital object present in model.")
                        )
                    }
Matthias's avatar
Matthias committed
290
                }
291
292
293
            } else // record is null
            {
                Triple(data.first, data.second, Report(key, ReportStatus.failure, "No record present in model."))
Jonas Waeber's avatar
Jonas Waeber committed
294
            }
Jonas Waeber's avatar
Jonas Waeber committed
295
296
297
        }
    }
}