KafkaTopology.kt 7.67 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
24
import org.apache.jena.rdf.model.ResourceFactory
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.rdf.EBUCORE
28
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
29
30
31
32
33
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
34
35
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
36
37
import java.io.StringReader
import java.io.StringWriter
Jonas Waeber's avatar
Jonas Waeber committed
38
39

class KafkaTopology(private val settings: SettingsLoader) {
40
41
    private val appSettings = settings.appSettings

Jonas Waeber's avatar
Jonas Waeber committed
42
    private val sftpClient = SftpClient(settings.sftpSettings)
43
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
44
    private val reportingTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
45

46
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
47
48
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
49
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
50
51

        val instantiationBranch = stream
52
            .transformValues(HeaderExtractionTransformSupplier<String>())
53
54
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
Matthias's avatar
Matthias committed
55
56
57
58
59
60
61
            .mapValues { value ->
                enrichSftpLocator(
                    "key",
                    Triple(value.first, value.second, Report("", "", "")),
                    "thumbnails"
                )
            }
62
63
64
65
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
66
67

        val updateDigitalObjects = instantiationBranch[0]
Jonas Waeber's avatar
Jonas Waeber committed
68
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value) }
69
70
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
71
                value.first.first.write(out, Constant.rdfParserLang)
72
73
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
74
75

        updateDigitalObjects
76
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
77
78

        updateDigitalObjects
79
80
81
82
83
84
85
86
            .mapValues { readOnlyKey, _ ->
                Report(
                    readOnlyKey,
                    ReportStatus.success,
                    "Create sftp file path to instantiation."
                ).toJson()
            }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
87
88

        val unchangedWrittenResources = instantiationBranch[1]
89
90
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
91
                value.first.first.write(out, Constant.rdfParserLang)
92
93
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
94
95

        unchangedWrittenResources
96
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
97
98

        unchangedWrittenResources
99
100
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
101

102
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
103
104
    }

105
106
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
107
108
    }

109
110
111
112
113
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

    private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
Jonas Waeber's avatar
Jonas Waeber committed
114
115
116
117
118
119
        return res.filter {
            it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(
                RICO.type,
                Constant.identifierType
            )
        }
Jonas Waeber's avatar
Jonas Waeber committed
120
121
    }

122
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
Jonas Waeber's avatar
Jonas Waeber committed
123
        val model = ModelFactory.createDefaultModel()
124
125
        model.read(StringReader(data.first), "", Constant.rdfParserLang)
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
126
127
    }

Matthias's avatar
Matthias committed
128
129
130
131
132
    private fun enrichSftpLocator(
        key: String,
        data: Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report>,
        type: String = "media"
    ): Triple<Pair<Model, HeaderMetadata>, List<Resource>, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
133
        var link = ""
134
135
        val digitalObject =
            data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
136
137
138
139
        // This assumes that there is only one, and always one original identifier present!
        val originalIdentifier = try {
            getOriginalIdentifiers(data.second)[0]
        } catch (ex: IndexOutOfBoundsException) {
Matthias's avatar
Matthias committed
140
141
142
143
144
            return Triple(
                data.first,
                data.second,
                Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key))
            )
Jonas Waeber's avatar
Jonas Waeber committed
145
        }
146
        val value = originalIdentifier.getProperty(RICO.identifier).string
Matthias's avatar
Matthias committed
147
148
        val record =
            data.second.first { it.hasProperty(RDF.type, RICO.Record) }
149

Matthias's avatar
Matthias committed
150
151
        val fileExtensions = arrayOf("jpg", "jpeg", "png", "mp3", "mp4")
        for (extension in fileExtensions) {
Matthias's avatar
Matthias committed
152
            val filePath = "$sftpBasePath/${data.first.second.recordSetId}/$type/$value.$extension"
Matthias's avatar
Matthias committed
153
154
            if (sftpClient.exists(filePath)) {
                link = "${Constant.sftpPathPrefix}$filePath"
Matthias's avatar
Matthias committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
                if (type == "media") {
                    val literal = ResourceFactory.createPlainLiteral(link)
                    digitalObject.addLiteral(EBUCORE.locator, literal)
                    data.first.first.createLiteral(digitalObject.toString(), true)
                } else if (type == "thumbnails") {
                    val thumbnail = data.first.first.createResource(
                        "https://memobase.ch/digital/${digitalObject.uri.substringAfterLast("/")}/derived"
                    )
                    val literal = ResourceFactory.createPlainLiteral(link)
                    thumbnail.addProperty(RDF.type, RICO.Instantiation)
                    thumbnail.addProperty(RICO.type, "thumbnail")
                    thumbnail.addProperty(EBUCORE.locator, literal)
                    digitalObject.addProperty(RICO.hasDerivedInstantiation, thumbnail)
                    thumbnail.addProperty(RICO.isDerivedFromInstantiation, digitalObject)
                    record.addProperty(RICO.hasInstantiation, thumbnail)
                    thumbnail.addProperty(RICO.instantiates, record)
                }
                return Triple(
                    data.first,
                    data.second,
Jonas Waeber's avatar
Jonas Waeber committed
175
                    Report(
Matthias's avatar
Matthias committed
176
                        record.uri,
Jonas Waeber's avatar
Jonas Waeber committed
177
                        ReportStatus.success,
Matthias's avatar
Matthias committed
178
                        data.third.message + "\n" + ReportMessages.reportSuccess(digitalObject.uri, link)
Jonas Waeber's avatar
Jonas Waeber committed
179
                    )
180
                )
Jonas Waeber's avatar
Jonas Waeber committed
181
            }
Jonas Waeber's avatar
Jonas Waeber committed
182
        }
Matthias's avatar
Matthias committed
183
184
185
        return Triple(
            data.first,
            data.second,
Jonas Waeber's avatar
Jonas Waeber committed
186
            Report(
Matthias's avatar
Matthias committed
187
                record.uri,
Jonas Waeber's avatar
Jonas Waeber committed
188
                ReportStatus.failure,
Matthias's avatar
Matthias committed
189
                data.third.message + "\n" + ReportMessages.reportSuccess(digitalObject.uri, link)
Jonas Waeber's avatar
Jonas Waeber committed
190
191
            )
        )
Jonas Waeber's avatar
Jonas Waeber committed
192
    }
193

Jonas Waeber's avatar
Jonas Waeber committed
194
}