KafkaTopology.kt 6.35 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
import java.io.StringReader
Jonas Waeber's avatar
Jonas Waeber committed
22
23
24
25
26
import java.io.StringWriter
import kotlin.system.exitProcess
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
Jonas Waeber's avatar
Jonas Waeber committed
27
import org.apache.jena.rdf.model.Statement
Jonas Waeber's avatar
Jonas Waeber committed
28
29
30
31
32
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
33
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
34
35
36
37
38
39
40
41
42
43
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient

class KafkaTopology(private val settings: SettingsLoader) {
    private val log = LogManager.getLogger("MediaLinker")

    private val sftpClient = SftpClient(settings.sftpSettings)
44
    private val sftpBasePath = settings.appSettings.getProperty(Constant.sftpBasePathPropertyName)
Jonas Waeber's avatar
Jonas Waeber committed
45

46
    private val files = createMediaFileList(settings.appSettings.getProperty(Constant.recordSetIdPropertyName))
Jonas Waeber's avatar
Jonas Waeber committed
47
48
49

    private fun createMediaFileList(setting: String): List<String> {
        return try {
50
            val list = sftpClient.listFiles("$sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
51
52
            log.info("Files found on sftp server: $list.")
            list
Jonas Waeber's avatar
Jonas Waeber committed
53
54
        } catch (ex: Exception) {
            ex.printStackTrace()
55
            log.error(ex.localizedMessage + ": $sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
56
57
58
59
            exitProcess(1)
        }
    }

60
    private val reportingTopic = "${settings.outputTopic}-${Constant.topicReportingSuffix}"
Jonas Waeber's avatar
Jonas Waeber committed
61

62
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
63
64
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
65
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
66
67

        val instantiationBranch = stream
68
69
70
71
72
73
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
74
75

        val updateDigitalObjects = instantiationBranch[0]
Jonas Waeber's avatar
Jonas Waeber committed
76
            .mapValues { readOnlyKey, value -> enrichSftpLocator(readOnlyKey, value) }
77
78
79
80
81
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
82
83

        updateDigitalObjects
84
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
85
86

        updateDigitalObjects
87
88
89
90
91
92
93
94
95
            .mapValues { readOnlyKey, _ ->
                Report(
                    readOnlyKey,
                    ReportStatus.success,
                    "Create sftp file path to instantiation."
                ).toJson()

            }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
96
97

        val unchangedWrittenResources = instantiationBranch[1]
98
99
100
101
102
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
103
104

        unchangedWrittenResources
105
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
106
107

        unchangedWrittenResources
108
109
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
110

111
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
112
113
    }

Jonas Waeber's avatar
Jonas Waeber committed
114
115
    private fun extractSubjects(model: Model): Pair<Model, List<Resource>> {
        return Pair(model, model.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
116
117
    }

118
119
120
121
122
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

    private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
Jonas Waeber's avatar
Jonas Waeber committed
123
124
125
126
127
128
        return res.filter {
            it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(
                RICO.type,
                Constant.identifierType
            )
        }
Jonas Waeber's avatar
Jonas Waeber committed
129
130
    }

Jonas Waeber's avatar
Jonas Waeber committed
131
    private fun createModel(data: String): Model {
Jonas Waeber's avatar
Jonas Waeber committed
132
        val model = ModelFactory.createDefaultModel()
133
        model.read(StringReader(data), "", Constant.rdfParserLang)
Jonas Waeber's avatar
Jonas Waeber committed
134
135
136
        return model
    }

Jonas Waeber's avatar
Jonas Waeber committed
137
    private fun enrichSftpLocator(key: String, data: Pair<Model, List<Resource>>): Pair<Model, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
138
        var link = ""
139
140
        val digitalObject =
            data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
141
142
143
144
145
        // This assumes that there is only one, and always one original identifier present!
        val originalIdentifier = try {
            getOriginalIdentifiers(data.second)[0]
        } catch (ex: IndexOutOfBoundsException) {
            return Pair(data.first, Report(key, ReportStatus.failure, ReportMessages.noOriginalIdentifier(key)))
Jonas Waeber's avatar
Jonas Waeber committed
146
        }
Jonas Waeber's avatar
Jonas Waeber committed
147
148
149
150
151
152
153
154
155
156
157
158
159
        val value = originalIdentifier.getProperty(RICO.identifier)
        for (file in files) {
            if (file.contains(value.string)) {
                link = "${Constant.sftpPathPrefix}$file"
                val literal = data.first.createLiteral(link)
                data.first.add(StatementImpl(digitalObject, EBUCORE.locator, literal))
                return Pair(
                    data.first,
                    Report(
                        digitalObject.uri,
                        ReportStatus.success,
                        ReportMessages.reportSuccess(digitalObject.uri, link)
                    )
160
                )
Jonas Waeber's avatar
Jonas Waeber committed
161
            }
Jonas Waeber's avatar
Jonas Waeber committed
162
        }
Jonas Waeber's avatar
Jonas Waeber committed
163
164
165
166
167
168
169
170
        return Pair(
            data.first,
            Report(
                digitalObject.uri,
                ReportStatus.failure,
                ReportMessages.reportFailure(digitalObject.uri)
            )
        )
Jonas Waeber's avatar
Jonas Waeber committed
171
172
    }
}