KafkaTopology.kt 9.47 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
24
import org.apache.jena.rdf.model.ResourceFactory
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.rdf.EBUCORE
28
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
29
30
31
32
33
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
34
35
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
36
37
import java.io.StringReader
import java.io.StringWriter
Jonas Waeber's avatar
Jonas Waeber committed
38
39

class KafkaTopology(private val settings: SettingsLoader) {
40
41
    private val appSettings = settings.appSettings

Jonas Waeber's avatar
Jonas Waeber committed
42
    private val sftpClient = SftpClient(settings.sftpSettings)
43
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
44
    private val fileExtensions = appSettings.getProperty(Constant.extensionsPropertyName).split(",")
45
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
46

47
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
48
49
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
50
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
51
52

        val instantiationBranch = stream
53
            .transformValues(HeaderExtractionTransformSupplier<String>())
54
55
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
56
            .mapValues { readOnlyKey, value ->
Matthias's avatar
Matthias committed
57
                enrichSftpLocator(
58
59
60
                    readOnlyKey,
                    Triple(value.first, value.second, Report("", status = "", message = "")),
                    Constant.thumbnailFolderName
Matthias's avatar
Matthias committed
61
62
                )
            }
63
64
65
66
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
67
68

        val updateDigitalObjects = instantiationBranch[0]
69
70
71
72
73
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value, Constant.mediaFolderName) }


        updateDigitalObjects
            .filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
74
75
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
76
                value.first.first.write(out, Constant.rdfParserLang)
77
78
79
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
80

81
        // report
Jonas Waeber's avatar
Jonas Waeber committed
82
        updateDigitalObjects
83
            .mapValues { value -> value.third.toJson() }
84
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
85

86
87
        instantiationBranch[1]
            .filterNot { _, value -> value.third.status == ReportStatus.failure } // failed records are deleted.
88
89
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
90
                value.first.first.write(out, Constant.rdfParserLang)
91
92
93
                out.toString().trim()
            }
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
94

95
96
        instantiationBranch[1]
            .mapValues { _, value -> value.third.toJson() }
97
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
98

99
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
100
101
    }

102
103
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
104
105
    }

106
107
108
109
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

110
111
112
113
114
115
116
117
    private fun getOriginalIdentifiers(record: Resource): List<Resource> {
        return record.listProperties(RICO.identifiedBy).toList().map { statement -> statement.`object`.asResource() }
            .filter { resource ->
                resource.hasProperty(RDF.type, RICO.Identifier) && resource.hasProperty(
                    RICO.type,
                    Constant.identifierType
                )
            }
Jonas Waeber's avatar
Jonas Waeber committed
118
119
    }

120
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
Jonas Waeber's avatar
Jonas Waeber committed
121
        val model = ModelFactory.createDefaultModel()
122
123
        model.read(StringReader(data.first), "", Constant.rdfParserLang)
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
124
125
    }

Matthias's avatar
Matthias committed
126
127
128
    private fun enrichSftpLocator(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
129
        type: String
Matthias's avatar
Matthias committed
130
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
131
        var link = ""
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        return data.second.firstOrNull { it.hasProperty(RDF.type, RICO.Record) }.let { record ->
            if (record != null) {
                data.second.firstOrNull { it.hasProperty(RICO.type, Constant.digitalObject) }.let { digitalObject ->
                    if (digitalObject != null) {
                        val originalIdentifier = try {
                            getOriginalIdentifiers(record)[0]
                        } catch (ex: IndexOutOfBoundsException) {
                            return Triple(
                                data.first,
                                data.second,
                                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
                            )
                        }
                        val value = originalIdentifier.getProperty(RICO.identifier).string

                        for (extension in fileExtensions) {
                            val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
                            if (sftpClient.exists(filePath)) {
                                link = "${Constant.sftpPathPrefix}$filePath"
                                if (type == Constant.mediaFolderName) {
                                    val literal = ResourceFactory.createPlainLiteral(link)
                                    digitalObject.addLiteral(EBUCORE.locator, literal)
                                    data.first.first.createLiteral(digitalObject.toString(), true)
                                } else if (type == Constant.thumbnailFolderName) {
                                    val thumbnail = data.first.first.createResource(
                                        "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
                                    )
                                    val literal = ResourceFactory.createPlainLiteral(link)
                                    thumbnail.addProperty(RDF.type, RICO.Instantiation)
                                    thumbnail.addProperty(RICO.type, Constant.thumbnailRicoType)
                                    thumbnail.addProperty(EBUCORE.locator, literal)
                                    digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
                                    thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
                                    record.addProperty(RICO.hasInstantiation, thumbnail)
                                    thumbnail.addProperty(RICO.instantiates, record)
                                }
                                return Triple(
                                    data.first,
                                    data.second,
                                    Report(
                                        record.uri,
                                        ReportStatus.success,
                                        "${data.third.message}\n${
                                            ReportMessages.reportSuccess(
                                                digitalObject.uri,
                                                link,
                                                type
                                            )
                                        }".trim()
                                    )
                                )
                            }
                        }
                        Triple(
                            data.first,
                            data.second,
                            Report(
                                record.uri,
                                ReportStatus.failure,
                                "${data.third.message}\n${ReportMessages.reportFailure(digitalObject.uri, type)}".trim()
                            )
                        )
                    } else // digital object is null!
                    {
                        Triple(
                            data.first,
                            data.second,
                            Report(key, ReportStatus.failure, "No digital object present in model.")
                        )
                    }
Matthias's avatar
Matthias committed
202
                }
203
204
205
            } else // record is null
            {
                Triple(data.first, data.second, Report(key, ReportStatus.failure, "No record present in model."))
Jonas Waeber's avatar
Jonas Waeber committed
206
            }
Jonas Waeber's avatar
Jonas Waeber committed
207
208
209
        }
    }
}