KafkaTopology.kt 12.2 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
import java.io.StringReader
import java.io.StringWriter
Jonas Waeber's avatar
Jonas Waeber committed
23
24
25
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
26
import org.apache.jena.rdf.model.ResourceFactory
Jonas Waeber's avatar
Jonas Waeber committed
27
28
29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.rdf.EBUCORE
30
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
31
32
33
34
35
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
36
37
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
38
39

class KafkaTopology(private val settings: SettingsLoader) {
40
41
    private val appSettings = settings.appSettings

Jonas Waeber's avatar
Jonas Waeber committed
42
    private val sftpClient = SftpClient(settings.sftpSettings)
43
    private val previewImageHandler = PreviewImageHandler(sftpClient)
44
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
45
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
46
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
47

48
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
49
50
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
51
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
52
53

        val instantiationBranch = stream
54
            .transformValues(HeaderExtractionTransformSupplier<String>())
55
56
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
57
            .mapValues { readOnlyKey, value ->
Matthias's avatar
Matthias committed
58
                enrichSftpLocator(
59
60
61
                    readOnlyKey,
                    Triple(value.first, value.second, Report("", status = "", message = "")),
                    Constant.thumbnailFolderName
Matthias's avatar
Matthias committed
62
63
                )
            }
64
65
66
67
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
68
69

        val updateDigitalObjects = instantiationBranch[0]
70
71
72
73
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value, Constant.mediaFolderName) }

        updateDigitalObjects
            .filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
74
75
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
76
                value.first.first.write(out, Constant.rdfParserLang)
77
78
79
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
80

81
        // report
Jonas Waeber's avatar
Jonas Waeber committed
82
        updateDigitalObjects
83
            .mapValues { value -> value.third.toJson() }
84
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
85

86
87
        instantiationBranch[1]
            .filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
88
            .mapValues { value -> fetchThumbnailForYoutubeOrVimeoFile(value) }
89
90
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
91
                value.first.first.write(out, Constant.rdfParserLang)
92
93
94
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
95

96
97
        instantiationBranch[1]
            .mapValues { _, value -> value.third.toJson() }
98
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
99

100
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
101
102
    }

103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
    private fun fetchThumbnailForYoutubeOrVimeoFile(value: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
        if (noThumbnailAttached(value.second)) {
            val record = value.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }
            val digitalObject = value.second.firstOrNull { it.hasProperty(RDF.type, Constant.digitalObject) }
            if (record != null && digitalObject != null) {
                val locator = digitalObject.getProperty(EBUCORE.locator).string
                when {
                    PreviewImageHandler.isVimeoUrl(locator) -> {
                        this.previewImageHandler.getFromVimeo(locator, Constant.vimeoThumbnailWidth)
                    }
                    PreviewImageHandler.isYoutubeUrl(locator) -> {
                        this.previewImageHandler.getFromYoutube(locator)
                    }
                    else -> {
                        null
                    }
                }?.let {
                    val pathOnSftpServer = previewImageHandler.moveFileToSFTP(
                        it,
                        "$sftpBasePath/${value.first.second.recordSetId}/${Constant.thumbnailFolderName}/$value.jpg"
                    )
                    if (pathOnSftpServer != null) {
                        createThumbnailResource(value.first.first, record, digitalObject, pathOnSftpServer)
                        val amendedReport = Report(
                            value.third.id,
                            value.third.status,
                            value.third.message + "; youtube / vimeo thumbnail fetched"
                        )
                        return value.copy(third = amendedReport)
                    } else {
                        val amendedReport = Report(
                            value.third.id,
                            ReportStatus.failure,
                            value.third.message + "; youtube / vimeo thumbnail couldn't be uploaded to Sftp server"
                        )
                        return value.copy(third = amendedReport)
                    }
                }
            }
        }
        val amendedReport = Report(
            value.third.id,
            ReportStatus.success,
            value.third.message + "; no additional youtube / vimeo thumbnails fetched"
        )
        return value.copy(third = amendedReport)
    }

    private fun noThumbnailAttached(resources: List<Resource>): Boolean {
        return resources.none { it.hasProperty(RICO.type, Constant.thumbnailRicoType) }
    }

155
156
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
157
158
    }

159
160
161
162
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

163
164
165
166
167
168
169
170
    private fun getOriginalIdentifiers(record: Resource): List<Resource> {
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
            .filter { resource ->
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
            }
Jonas Waeber's avatar
Jonas Waeber committed
171
172
    }

173
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
Jonas Waeber's avatar
Jonas Waeber committed
174
        val model = ModelFactory.createDefaultModel()
175
176
        model.read(StringReader(data.first), "", Constant.rdfParserLang)
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
177
178
    }

179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
    private fun createThumbnailResource(
        data: Model,
        record: Resource,
        digitalObject: Resource,
        locator: String
    ) {
        val thumbnail = data.createResource(
            "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
        )
        val literal = ResourceFactory.createPlainLiteral(locator)
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
        thumbnail.addProperty(EBUCORE.locator, literal)
        digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
        thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
        record.addProperty(RICO.hasInstantiation, thumbnail)
        thumbnail.addProperty(RICO.instantiates, record)
    }

Matthias's avatar
Matthias committed
198
199
200
    private fun enrichSftpLocator(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
201
        type: String
Matthias's avatar
Matthias committed
202
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
203
        var link: String
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
        return data.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }.let { record ->
            if (record != null) {
                data.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }.let { digitalObject ->
                    if (digitalObject != null) {
                        val originalIdentifier = try {
                            getOriginalIdentifiers(record)[0]
                        } catch (ex: IndexOutOfBoundsException) {
                            return Triple(
                                data.first,
                                data.second,
                                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
                            )
                        }
                        val value = originalIdentifier.getProperty(RICO.identifier).string

                        for (extension in fileExtensions) {
                            val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
                            if (sftpClient.exists(filePath)) {
                                link = "${Constant.sftpPathPrefix}$filePath"
                                if (type == Constant.mediaFolderName) {
                                    val literal = ResourceFactory.createPlainLiteral(link)
                                    digitalObject.addLiteral(EBUCORE.locator, literal)
                                    data.first.first.createLiteral(digitalObject.toString(), true)
                                } else if (type == Constant.thumbnailFolderName) {
228
                                    createThumbnailResource(data.first.first, record, digitalObject, link)
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
                                }
                                return Triple(
                                    data.first,
                                    data.second,
                                    Report(
                                        record.uri,
                                        ReportStatus.success,
                                        "${data.third.message}\n${
                                            ReportMessages.reportSuccess(
                                                digitalObject.uri,
                                                link,
                                                type
                                            )
                                        }".trim()
                                    )
                                )
                            }
                        }
                        Triple(
                            data.first,
                            data.second,
                            Report(
                                record.uri,
                                ReportStatus.failure,
                                "${data.third.message}\n${ReportMessages.reportFailure(digitalObject.uri, type)}".trim()
                            )
                        )
                    } else // digital object is null!
                    {
                        Triple(
                            data.first,
                            data.second,
                            Report(key, ReportStatus.failure, "No digital object present in model.")
                        )
                    }
Matthias's avatar
Matthias committed
264
                }
265
266
267
            } else // record is null
            {
                Triple(data.first, data.second, Report(key, ReportStatus.failure, "No record present in model."))
Jonas Waeber's avatar
Jonas Waeber committed
268
            }
Jonas Waeber's avatar
Jonas Waeber committed
269
270
271
        }
    }
}