KafkaTopology.kt 6.65 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
29
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
30
31
32
33
34
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
35
36
37
import java.io.StringReader
import java.io.StringWriter
import kotlin.system.exitProcess
Jonas Waeber's avatar
Jonas Waeber committed
38
39
40
41

class KafkaTopology(private val settings: SettingsLoader) {
    private val log = LogManager.getLogger("MediaLinker")

42
43
44
45
46
47
48
49
    private val appSettings = settings.appSettings

    private val fileFetcher = LocatorFetcher(
        settings.sftpSettings,
        appSettings.getProperty(Constant.sftpBasePathPropertyName),
        appSettings.getProperty(Constant.recordSetIdPropertyName),
        appSettings.getProperty(Constant.hasThumbnailsPropertyName)!!.toBoolean()
    )
Jonas Waeber's avatar
Jonas Waeber committed
50
    private val sftpClient = SftpClient(settings.sftpSettings)
51
    private val sftpBasePath = appSettings.getProperty(Constant.sftpBasePathPropertyName)
Jonas Waeber's avatar
Jonas Waeber committed
52

53
    private val files = createMediaFileList(settings.appSettings.getProperty(Constant.recordSetIdPropertyName))
Jonas Waeber's avatar
Jonas Waeber committed
54
55
56

    private fun createMediaFileList(setting: String): List<String> {
        return try {
57
            val list = sftpClient.listFiles("$sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
58
59
            log.info("Files found on sftp server: $list.")
            list
Jonas Waeber's avatar
Jonas Waeber committed
60
61
        } catch (ex: Exception) {
            ex.printStackTrace()
62
            log.error(ex.localizedMessage + ": $sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
63
64
65
66
            exitProcess(1)
        }
    }

67
    private val reportingTopic = "${settings.processReportTopic}-${Constant.topicReportingSuffix}"
Jonas Waeber's avatar
Jonas Waeber committed
68

69
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
70
71
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
72
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
73
74

        val instantiationBranch = stream
75
76
77
78
79
80
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
81
82

        val updateDigitalObjects = instantiationBranch[0]
Jonas Waeber's avatar
Jonas Waeber committed
83
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value) }
84
85
86
87
88
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
89
90

        updateDigitalObjects
91
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
92
93

        updateDigitalObjects
94
95
96
97
98
99
100
101
102
            .mapValues { readOnlyKey, _ ->
                Report(
                    readOnlyKey,
                    ReportStatus.success,
                    "Create sftp file path to instantiation."
                ).toJson()

            }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
103
104

        val unchangedWrittenResources = instantiationBranch[1]
105
106
107
108
109
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
110
111

        unchangedWrittenResources
112
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
113
114

        unchangedWrittenResources
115
116
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
117

118
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
119
120
    }

Jonas Waeber's avatar
Jonas Waeber committed
121
122
    private fun extractSubjects(model: Model): Pair<Model, List<Resource>> {
        return Pair(model, model.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
123
124
    }

125
126
127
128
129
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

    private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
Jonas Waeber's avatar
Jonas Waeber committed
130
131
132
133
134
135
        return res.filter {
            it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(
                RICO.type,
                Constant.identifierType
            )
        }
Jonas Waeber's avatar
Jonas Waeber committed
136
137
    }

Jonas Waeber's avatar
Jonas Waeber committed
138
    private fun createModel(data: String): Model {
Jonas Waeber's avatar
Jonas Waeber committed
139
        val model = ModelFactory.createDefaultModel()
140
        model.read(StringReader(data), "", Constant.rdfParserLang)
Jonas Waeber's avatar
Jonas Waeber committed
141
142
143
        return model
    }

Jonas Waeber's avatar
Jonas Waeber committed
144
    private fun enrichSftpLocator(key: String, data: Pair<Model, List<Resource>>): Pair<Model, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
145
        var link = ""
146
147
        val digitalObject =
            data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
148
149
150
151
152
        // This assumes that there is only one, and always one original identifier present!
        val originalIdentifier = try {
            getOriginalIdentifiers(data.second)[0]
        } catch (ex: IndexOutOfBoundsException) {
            return Pair(data.first, Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key)))
Jonas Waeber's avatar
Jonas Waeber committed
153
        }
Jonas Waeber's avatar
Jonas Waeber committed
154
155
156
157
158
159
160
161
162
163
164
165
166
        val value = originalIdentifier.getProperty(RICO.identifier)
        for (file in files) {
            if (file.contains(value.string)) {
                link = "${Constant.sftpPathPrefix}$file"
                val literal = data.first.createLiteral(link)
                data.first.add(StatementImpl(digitalObject, EBUCORE.locator, literal))
                return Pair(
                    data.first,
                    Report(
                        digitalObject.uri,
                        ReportStatus.success,
                        ReportMessages.reportSuccess(digitalObject.uri, link)
                    )
167
                )
Jonas Waeber's avatar
Jonas Waeber committed
168
            }
Jonas Waeber's avatar
Jonas Waeber committed
169
        }
Jonas Waeber's avatar
Jonas Waeber committed
170
171
172
173
174
175
176
177
        return Pair(
            data.first,
            Report(
                digitalObject.uri,
                ReportStatus.failure,
                ReportMessages.reportFailure(digitalObject.uri)
            )
        )
Jonas Waeber's avatar
Jonas Waeber committed
178
179
    }
}