KafkaTopology.kt 12.7 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
import java.io.StringReader
import java.io.StringWriter
Jonas Waeber's avatar
Jonas Waeber committed
23
24
25
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
26
import org.apache.jena.rdf.model.ResourceFactory
27
import org.apache.jena.riot.RiotException
Jonas Waeber's avatar
Jonas Waeber committed
28
29
30
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.rdf.EBUCORE
31
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
32
33
34
35
36
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
37
38
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
39
40

class KafkaTopology(private val settings: SettingsLoader) {
41
42
    private val appSettings = settings.appSettings

Jonas Waeber's avatar
Jonas Waeber committed
43
    private val sftpClient = SftpClient(settings.sftpSettings)
44
    private val previewImageHandler = PreviewImageHandler(sftpClient)
45
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
46
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
47
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
48

49
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
50
51
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
52
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
53

54
        val model = stream
55
            .transformValues(HeaderExtractionTransformSupplier<String>())
56
            .mapValues { value -> createModel(value) }
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
            .branch(
                Predicate { _, value -> value != null },
                Predicate { _, _ -> true }
            )

        model[1]
            .mapValues { key, _ ->
                Report(
                    key,
                    ReportStatus.failure,
                    "Can't create model. Is there anything wrong with the data?"
                )
            }
            .to(reportingTopic)

        val instantiationBranch = model[0]
            .mapValues { value -> extractSubjects(value!!) }
74
            .mapValues { readOnlyKey, value ->
Matthias's avatar
Matthias committed
75
                enrichSftpLocator(
76
77
78
                    readOnlyKey,
                    Triple(value.first, value.second, Report("", status = "", message = "")),
                    Constant.thumbnailFolderName
Matthias's avatar
Matthias committed
79
80
                )
            }
81
82
83
84
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
85
86

        val updateDigitalObjects = instantiationBranch[0]
87
88
89
90
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value, Constant.mediaFolderName) }

        updateDigitalObjects
            .filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
91
92
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
93
                value.first.first.write(out, Constant.rdfParserLang)
94
95
96
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
97

98
        // report
Jonas Waeber's avatar
Jonas Waeber committed
99
        updateDigitalObjects
100
            .mapValues { value -> value.third.toJson() }
101
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
102

103
104
        instantiationBranch[1]
            .filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
105
            .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
106
107
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
108
                value.first.first.write(out, Constant.rdfParserLang)
109
110
111
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
112

113
114
        instantiationBranch[1]
            .mapValues { _, value -> value.third.toJson() }
115
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
116

117
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
118
119
    }

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
            val record = value.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
            val digitalObject = value.second.firstOrNull { it.hasProperty(RDF.type, Constant.digitalObject) }
            if (record != null && digitalObject != null) {
                val locator = digitalObject.getProperty(EBUCORE.locator).string
                when {
                    PreviewImageHandler.isVimeoUrl(locator) -> {
                        this.previewImageHandler.getFromVimeo(locator, Constant.vimeoThumbnailWidth)
                    }
                    PreviewImageHandler.isYoutubeUrl(locator) -> {
                        this.previewImageHandler.getFromYoutube(locator)
                    }
                    else -> {
                        null
                    }
                }?.let {
                    val pathOnSftpServer = previewImageHandler.moveFileToSFTP(
                        it,
                        "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/$value.jpg"
                    )
                    if (pathOnSftpServer != null) {
                        createThumbnailResource(value.first.first, record, digitalObject, pathOnSftpServer)
                        val amendedReport = Report(
                            value.third.id,
                            value.third.status,
                            value.third.message + "; youtube / vimeo thumbnail fetched"
                        )
                        return value.copy(third = amendedReport)
                    } else {
                        val amendedReport = Report(
                            value.third.id,
                            ReportStatus.failure,
                            value.third.message + "; youtube / vimeo thumbnail couldn't be uploaded to Sftp server"
                        )
                        return value.copy(third = amendedReport)
                    }
                }
            }
        }
        val amendedReport = Report(
            value.third.id,
            ReportStatus.success,
            value.third.message + "; no additional youtube / vimeo thumbnails fetched"
        )
        return value.copy(third = amendedReport)
    }

    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

172
173
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
174
175
    }

176
177
178
179
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

180
181
182
183
184
185
186
187
    private fun getOriginalIdentifiers(record: Resource): List<Resource> {
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
            .filter { resource ->
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
            }
Jonas Waeber's avatar
Jonas Waeber committed
188
189
    }

190
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata>? {
Jonas Waeber's avatar
Jonas Waeber committed
191
        val model = ModelFactory.createDefaultModel()
192
193
194
195
196
        try {
            model.read(StringReader(data.first), "", Constant.rdfParserLang)
        } catch (ex: RiotException) {
            return null
        }
197
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
198
199
    }

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
    ) {
        val thumbnail = data.createResource(
            "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
        )
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
    }

Matthias's avatar
Matthias committed
219
220
221
    private fun enrichSftpLocator(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
222
        type: String
Matthias's avatar
Matthias committed
223
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
224
        var link: String
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
        return data.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }.let { record ->
            if (record != null) {
                data.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }.let { digitalObject ->
                    if (digitalObject != null) {
                        val originalIdentifier = try {
                            getOriginalIdentifiers(record)[0]
                        } catch (ex: IndexOutOfBoundsException) {
                            return Triple(
                                data.first,
                                data.second,
                                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
                            )
                        }
                        val value = originalIdentifier.getProperty(RICO.identifier).string

                        for (extension in fileExtensions) {
                            val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
                            if (sftpClient.exists(filePath)) {
                                link = "${Constant.sftpPathPrefix}$filePath"
                                if (type == Constant.mediaFolderName) {
                                    val literal = ResourceFactory.createPlainLiteral(link)
                                    digitalObject.addLiteral(EBUCORE.locator, literal)
                                    data.first.first.createLiteral(digitalObject.toString(), true)
                                } else if (type == Constant.thumbnailFolderName) {
249
                                    createThumbnailResource(data.first.first, record, digitalObject, link)
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
                                }
                                return Triple(
                                    data.first,
                                    data.second,
                                    Report(
                                        record.uri,
                                        ReportStatus.success,
                                        "${data.third.message}\n${
                                            ReportMessages.reportSuccess(
                                                digitalObject.uri,
                                                link,
                                                type
                                            )
                                        }".trim()
                                    )
                                )
                            }
                        }
                        Triple(
                            data.first,
                            data.second,
                            Report(
                                record.uri,
                                ReportStatus.failure,
                                "${data.third.message}\n${ReportMessages.reportFailure(digitalObject.uri, type)}".trim()
                            )
                        )
                    } else // digital object is null!
                    {
                        Triple(
                            data.first,
                            data.second,
                            Report(key, ReportStatus.failure, "No digital object present in model.")
                        )
                    }
Matthias's avatar
Matthias committed
285
                }
286
287
288
            } else // record is null
            {
                Triple(data.first, data.second, Report(key, ReportStatus.failure, "No record present in model."))
Jonas Waeber's avatar
Jonas Waeber committed
289
            }
Jonas Waeber's avatar
Jonas Waeber committed
290
291
292
        }
    }
}