KafkaTopology.kt 7.79 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
24
import org.apache.jena.rdf.model.ResourceFactory
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
28
29
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
30
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
31
32
33
34
35
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
36
37
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
38
39
40
import java.io.StringReader
import java.io.StringWriter
import kotlin.system.exitProcess
Jonas Waeber's avatar
Jonas Waeber committed
41
42
43
44

class KafkaTopology(private val settings: SettingsLoader) {
    private val log = LogManager.getLogger("MediaLinker")

45
46
47
48
49
50
51
52
    private val appSettings = settings.appSettings

    private val fileFetcher = LocatorFetcher(
        settings.sftpSettings,
        appSettings.getProperty(Constant.sftpBasePathPropertyName),
        appSettings.getProperty(Constant.recordSetIdPropertyName),
        appSettings.getProperty(Constant.hasThumbnailsPropertyName)!!.toBoolean()
    )
Jonas Waeber's avatar
Jonas Waeber committed
53
    private val sftpClient = SftpClient(settings.sftpSettings)
54
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
Jonas Waeber's avatar
Jonas Waeber committed
55
56
57

    private fun createMediaFileList(setting: String): List<String> {
        return try {
58
            val list = sftpClient.listFiles("$sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
59
60
            log.info("Files found on sftp server: $list.")
            list
Jonas Waeber's avatar
Jonas Waeber committed
61
        } catch (ex: Exception) {
62
            //TODO: Report erstellen
Jonas Waeber's avatar
Jonas Waeber committed
63
            ex.printStackTrace()
64
            log.error(ex.localizedMessage + ": $sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
65
66
67
68
            exitProcess(1)
        }
    }

69
    private val reportingTopic = "${settings.processReportTopic}"
Jonas Waeber's avatar
Jonas Waeber committed
70

71
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
72
73
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
74
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
75
76

        val instantiationBranch = stream
77
            .transformValues(HeaderExtractionTransformSupplier<String>())
78
79
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
Matthias's avatar
Matthias committed
80
            //.mapValues { value -> createThumbnail(value) }
81
82
83
84
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
85
86

        val updateDigitalObjects = instantiationBranch[0]
Jonas Waeber's avatar
Jonas Waeber committed
87
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value) }
88
89
90
91
92
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
93
94

        updateDigitalObjects
95
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
96
97

        updateDigitalObjects
98
99
100
101
102
103
104
105
            .mapValues { readOnlyKey, _ ->
                Report(
                    readOnlyKey,
                    ReportStatus.success,
                    "Create sftp file path to instantiation."
                ).toJson()
            }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
106
107

        val unchangedWrittenResources = instantiationBranch[1]
108
109
            .mapValues { value ->
                val out = StringWriter()
Matthias's avatar
Matthias committed
110
                value.first.first.write(out, Constant.rdfParserLang)
111
112
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
113
114

        unchangedWrittenResources
115
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
116
117

        unchangedWrittenResources
118
119
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
120

121
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
122
123
    }

124
125
    private fun extractSubjects(input: Pair<Model, HeaderMetadata>): Pair<Pair<Model, HeaderMetadata>, List<Resource>> {
        return Pair(input, input.first.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
126
127
    }

128
129
130
131
132
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

    private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
Jonas Waeber's avatar
Jonas Waeber committed
133
134
135
136
137
138
        return res.filter {
            it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(
                RICO.type,
                Constant.identifierType
            )
        }
Jonas Waeber's avatar
Jonas Waeber committed
139
140
    }

141
    private fun createModel(data: Pair<String, HeaderMetadata>): Pair<Model, HeaderMetadata> {
Jonas Waeber's avatar
Jonas Waeber committed
142
        val model = ModelFactory.createDefaultModel()
143
144
        model.read(StringReader(data.first), "", Constant.rdfParserLang)
        return Pair(model, data.second)
Jonas Waeber's avatar
Jonas Waeber committed
145
146
    }

147
    private fun enrichSftpLocator(key: String, data: Pair<Pair<Model, HeaderMetadata>, List<Resource>>): Pair<Model, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
148
        var link = ""
149
150
        val digitalObject =
            data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
151
152
153
154
        // This assumes that there is only one, and always one original identifier present!
        val originalIdentifier = try {
            getOriginalIdentifiers(data.second)[0]
        } catch (ex: IndexOutOfBoundsException) {
155
            return Pair(data.first.first, Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key)))
Jonas Waeber's avatar
Jonas Waeber committed
156
        }
157
158
        val value = originalIdentifier.getProperty(RICO.identifier).string

Matthias's avatar
Matthias committed
159
160
        val fileExtensions = arrayOf("jpg", "jpeg", "png", "mp3", "mp4")
        for (extension in fileExtensions) {
161
            val filePath = sftpBasePath + "/" + data.first.second.recordSetId + "/media/" + value + "." + extension
Matthias's avatar
Matthias committed
162
163
            if (sftpClient.exists(filePath)) {
                link = "${Constant.sftpPathPrefix}$filePath"
164
165
                val literal = ResourceFactory.createPlainLiteral(link)
                digitalObject.addLiteral(EBUCORE.locator, literal)
Matthias's avatar
Matthias committed
166
                data.first.first.createLiteral(digitalObject.toString(), true)
Jonas Waeber's avatar
Jonas Waeber committed
167
                return Pair(
168
                    data.first.first,
Jonas Waeber's avatar
Jonas Waeber committed
169
170
171
172
173
                    Report(
                        digitalObject.uri,
                        ReportStatus.success,
                        ReportMessages.reportSuccess(digitalObject.uri, link)
                    )
174
                )
Matthias's avatar
Matthias committed
175
                break
Jonas Waeber's avatar
Jonas Waeber committed
176
            }
Jonas Waeber's avatar
Jonas Waeber committed
177
        }
Jonas Waeber's avatar
Jonas Waeber committed
178
        return Pair(
179
            data.first.first,
Jonas Waeber's avatar
Jonas Waeber committed
180
181
182
183
184
185
            Report(
                digitalObject.uri,
                ReportStatus.failure,
                ReportMessages.reportFailure(digitalObject.uri)
            )
        )
Jonas Waeber's avatar
Jonas Waeber committed
186
    }
187

Matthias's avatar
Matthias committed
188
189
    /*
    private fun createThumbnail(key: data: Pair<Pair<Model, HeaderMetadata>, List<Resource>>): Pair<Model, Report> {
190
191
192
193
194

        // TODO: same here as in enrichSftpLocator but for thum
        val thumbnail = data.first.first.createResource()
        thumbnail.addProperty(RDF.type, RICO.Instantiation)
        thumbnail.addProperty(RICO.type, "thumbnail")
Matthias's avatar
Matthias committed
195
196
        thumbnail.addProperty(EBUCORE.locator, "locator of file")
        return Pair(data.first.first, new Report("id00", "undefined","just a sample report"))
197
198

    }
Matthias's avatar
Matthias committed
199
200
     */

Jonas Waeber's avatar
Jonas Waeber committed
201
}