KafkaTopology.kt 8.18 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
24
import org.apache.jena.rdf.model.ResourceFactory
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
28
29
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
30
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
31
32
33
34
35
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
36
37
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
38
39
40
import java.io.StringReader
import java.io.StringWriter
import kotlin.system.exitProcess
Jonas Waeber's avatar
Jonas Waeber committed
41
42
43
44

class KafkaTopology(private val settings: SettingsLoader) {
    private val log = LogManager.getLogger("MediaLinker")

45
46
47
48
49
50
51
52
    private val appSettings = settings.appSettings

    private val fileFetcher = LocatorFetcher(
        settings.sftpSettings,
        appSettings.getProperty(Constant.sftpBasePathPropertyName),
        appSettings.getProperty(Constant.recordSetIdPropertyName),
        appSettings.getProperty(Constant.hasThumbnailsPropertyName)!!.toBoolean()
    )
Jonas Waeber's avatar
Jonas Waeber committed
53
    private val sftpClient = SftpClient(settings.sftpSettings)
54
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
55
    private val reportingTopic = "${settings.processReportTopic}"
Jonas Waeber's avatar
Jonas Waeber committed
56

57
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
58
59
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
60
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
61
62

        val instantiationBranch = stream
63
            .transformValues(HeaderExtractionTransformSupplier<String>())
64
65
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
Matthias's avatar
Matthias committed
66
67
68
69
70
71
72
            .mapValues { value ->
                enrichSftpLocator(
                    "key",
                    Triple(value.first, value.second, Report("", "", "")),
                    "thumbnails"
                )
            }
73
74
75
76
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
77
78

        val updateDigitalObjects = instantiationBranch[0]
Jonas Waeber's avatar
Jonas Waeber committed
79
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value) }
80
81
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
82
                value.first.first.write(out, Constant.rdfParserLang)
83
84
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
85
86

        updateDigitalObjects
87
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
88
89

        updateDigitalObjects
90
91
92
93
94
95
96
97
            .mapValues { readOnlyKey, _ ->
                Report(
                    readOnlyKey,
                    ReportStatus.success,
                    "Create sftp file path to instantiation."
                ).toJson()
            }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
98
99

        val unchangedWrittenResources = instantiationBranch[1]
100
101
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
102
                value.first.first.write(out, Constant.rdfParserLang)
103
104
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
105
106

        unchangedWrittenResources
107
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
108
109

        unchangedWrittenResources
110
111
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
112

113
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
114
115
    }

116
117
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
118
119
    }

120
121
122
123
124
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

    private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
Jonas Waeber's avatar
Jonas Waeber committed
125
126
127
128
129
130
        return res.filter {
            it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(
                RICO.type,
                Constant.identifierType
            )
        }
Jonas Waeber's avatar
Jonas Waeber committed
131
132
    }

133
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
Jonas Waeber's avatar
Jonas Waeber committed
134
        val model = ModelFactory.createDefaultModel()
135
136
        model.read(StringReader(data.first), "", Constant.rdfParserLang)
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
137
138
    }

Matthias's avatar
Matthias committed
139
140
141
142
143
    private fun enrichSftpLocator(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        type: String = "media"
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
144
        var link = ""
145
146
        val digitalObject =
            data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
147
148
149
150
        // This assumes that there is only one, and always one original identifier present!
        val originalIdentifier = try {
            getOriginalIdentifiers(data.second)[0]
        } catch (ex: IndexOutOfBoundsException) {
Matthias's avatar
Matthias committed
151
152
153
154
155
            return Triple(
                data.first,
                data.second,
                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
            )
Jonas Waeber's avatar
Jonas Waeber committed
156
        }
157
        val value = originalIdentifier.getProperty(RICO.identifier).string
Matthias's avatar
Matthias committed
158
159
        val record =
            data.second.first { it.hasProperty(RDF.type, RICO.Record) }
160

Matthias's avatar
Matthias committed
161
162
        val fileExtensions = arrayOf("jpg", "jpeg", "png", "mp3", "mp4")
        for (extension in fileExtensions) {
Matthias's avatar
Matthias committed
163
            val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
Matthias's avatar
Matthias committed
164
165
            if (sftpClient.exists(filePath)) {
                link = "${Constant.sftpPathPrefix}$filePath"
Matthias's avatar
Matthias committed
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
                if (type == "media") {
                    val literal = ResourceFactory.createPlainLiteral(link)
                    digitalObject.addLiteral(EBUCORE.locator, literal)
                    data.first.first.createLiteral(digitalObject.toString(), true)
                } else if (type == "thumbnails") {
                    val thumbnail = data.first.first.createResource(
                        "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
                    )
                    val literal = ResourceFactory.createPlainLiteral(link)
                    thumbnail.addProperty(RDF.type, RICO.Instantiation)
                    thumbnail.addProperty(RICO.type, "thumbnail")
                    thumbnail.addProperty(EBUCORE.locator, literal)
                    digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
                    thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
                    record.addProperty(RICO.hasInstantiation, thumbnail)
                    thumbnail.addProperty(RICO.instantiates, record)
                }
                return Triple(
                    data.first,
                    data.second,
Jonas Waeber's avatar
Jonas Waeber committed
186
                    Report(
Matthias's avatar
Matthias committed
187
                        record.uri,
Jonas Waeber's avatar
Jonas Waeber committed
188
                        ReportStatus.success,
Matthias's avatar
Matthias committed
189
                        data.third.message + "\n" + ReportMessages.reportSuccess(digitalObject.uri, link)
Jonas Waeber's avatar
Jonas Waeber committed
190
                    )
191
                )
Matthias's avatar
Matthias committed
192
                break
Jonas Waeber's avatar
Jonas Waeber committed
193
            }
Jonas Waeber's avatar
Jonas Waeber committed
194
        }
Matthias's avatar
Matthias committed
195
196
197
        return Triple(
            data.first,
            data.second,
Jonas Waeber's avatar
Jonas Waeber committed
198
            Report(
Matthias's avatar
Matthias committed
199
                record.uri,
Jonas Waeber's avatar
Jonas Waeber committed
200
                ReportStatus.failure,
Matthias's avatar
Matthias committed
201
                data.third.message + "\n" + ReportMessages.reportSuccess(digitalObject.uri, link)
Jonas Waeber's avatar
Jonas Waeber committed
202
203
            )
        )
Jonas Waeber's avatar
Jonas Waeber committed
204
    }
205

Jonas Waeber's avatar
Jonas Waeber committed
206
}