KafkaTopology.kt 6.57 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
import java.io.StringReader
Jonas Waeber's avatar
Jonas Waeber committed
22
23
24
25
26
import java.io.StringWriter
import kotlin.system.exitProcess
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
Jonas Waeber's avatar
Jonas Waeber committed
27
import org.apache.jena.rdf.model.Statement
Jonas Waeber's avatar
Jonas Waeber committed
28
29
30
31
32
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
33
import org.memobase.rdf.RDF
Jonas Waeber's avatar
Jonas Waeber committed
34
35
36
37
38
39
40
41
42
43
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient

class KafkaTopology(private val settings: SettingsLoader) {
    private val log = LogManager.getLogger("MediaLinker")

    private val sftpClient = SftpClient(settings.sftpSettings)
44
    private val sftpBasePath = settings.appSettings.getProperty(Constant.sftpBasePathPropertyName)
Jonas Waeber's avatar
Jonas Waeber committed
45

46
    private val files = createMediaFileList(settings.appSettings.getProperty(Constant.recordSetIdPropertyName))
Jonas Waeber's avatar
Jonas Waeber committed
47
48
49

    private fun createMediaFileList(setting: String): List<String> {
        return try {
50
            val list = sftpClient.listFiles("$sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
51
52
            log.info("Files found on sftp server: $list.")
            list
Jonas Waeber's avatar
Jonas Waeber committed
53
54
        } catch (ex: Exception) {
            ex.printStackTrace()
55
            log.error(ex.localizedMessage + ": $sftpBasePath/$setting/${Constant.mediaFolderName}")
Jonas Waeber's avatar
Jonas Waeber committed
56
57
58
59
            exitProcess(1)
        }
    }

60
    private val reportingTopic = "${settings.outputTopic}-${Constant.topicReportingSuffix}"
Jonas Waeber's avatar
Jonas Waeber committed
61

62
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
63
64
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
65
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
66
67

        val instantiationBranch = stream
68
69
70
71
72
73
            .mapValues { value -> createModel(value) }
            .mapValues { value -> extractSubjects(value) }
            .branch(
                Predicate { _, value -> containsDigitalObjectWithoutLocator(value.second) },
                Predicate { _, _ -> true }
            )
Jonas Waeber's avatar
Jonas Waeber committed
74
75

        val updateDigitalObjects = instantiationBranch[0]
76
77
78
79
80
81
            .mapValues { value -> enrichSftpLocator(value) }
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
82
83

        updateDigitalObjects
84
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
85
86

        updateDigitalObjects
87
88
89
90
91
92
93
94
95
            .mapValues { readOnlyKey, _ ->
                Report(
                    readOnlyKey,
                    ReportStatus.success,
                    "Create sftp file path to instantiation."
                ).toJson()

            }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
96
97

        val unchangedWrittenResources = instantiationBranch[1]
98
99
100
101
102
            .mapValues { value ->
                val out = StringWriter()
                value.first.write(out, Constant.rdfParserLang)
                out.toString().trim()
            }
Jonas Waeber's avatar
Jonas Waeber committed
103
104

        unchangedWrittenResources
105
            .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
106
107

        unchangedWrittenResources
108
109
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No linkable instantiation found.").toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
110

111
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
112
113
    }

Jonas Waeber's avatar
Jonas Waeber committed
114
115
    private fun extractSubjects(model: Model): Pair<Model, List<Resource>> {
        return Pair(model, model.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
116
117
    }

118
119
120
121
122
123
    private fun containsDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
    }

    private fun getOriginalIdentifiers(res: List<Resource>): List<Resource> {
        return res.filterNot { it.hasProperty(RDF.type, RICO.Identifier) && it.hasProperty(RICO.type, Constant.identifierType) }
Jonas Waeber's avatar
Jonas Waeber committed
124
125
    }

Jonas Waeber's avatar
Jonas Waeber committed
126
    private fun createModel(data: String): Model {
Jonas Waeber's avatar
Jonas Waeber committed
127
        val model = ModelFactory.createDefaultModel()
128
        model.read(StringReader(data), "", Constant.rdfParserLang)
Jonas Waeber's avatar
Jonas Waeber committed
129
130
131
        return model
    }

Jonas Waeber's avatar
Jonas Waeber committed
132
    private fun enrichSftpLocator(data: Pair<Model, List<Resource>>): Pair<Model, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
133
        var link = ""
134
135
        val digitalObject =
            data.second.first { it.hasProperty(RICO.type, Constant.digitalObject) && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
136
        val addedStatements = mutableListOf<Statement>()
Jonas Waeber's avatar
Jonas Waeber committed
137
        for (stmt in digitalObject.listProperties(RICO.identifiedBy)) {
Jonas Waeber's avatar
Jonas Waeber committed
138
139
140
            if (stmt != null) {
                if (stmt.`object`.isResource) {
                    val resource = stmt.`object`.asResource()
141
                    if (resource.hasLiteral(RICO.type, Constant.identifierType)) {
Jonas Waeber's avatar
Jonas Waeber committed
142
143
144
                        val value = resource.getProperty(RICO.identifier).literal
                        for (file in files) {
                            if (file.contains(value.string)) {
145
                                link = "${Constant.sftpPathPrefix}$file"
Jonas Waeber's avatar
Jonas Waeber committed
146
                                val literal = data.first.createLiteral(link)
Jonas Waeber's avatar
Jonas Waeber committed
147
                                addedStatements.add(StatementImpl(digitalObject, EBUCORE.locator, literal))
Jonas Waeber's avatar
Jonas Waeber committed
148
149
150
151
152
153
                            }
                        }
                    }
                }
            }
        }
Jonas Waeber's avatar
Jonas Waeber committed
154
155
        if (addedStatements.size > 0) {
            data.first.add(addedStatements)
Jonas Waeber's avatar
Jonas Waeber committed
156
            return Pair(
157
158
159
160
161
162
                data.first,
                Report(
                    digitalObject.uri,
                    ReportStatus.success,
                    ReportMessages.reportSuccess(digitalObject.uri, link)
                )
Jonas Waeber's avatar
Jonas Waeber committed
163
164
165
            )
        } else {
            return Pair(
166
167
168
169
170
171
                data.first,
                Report(
                    digitalObject.id.labelString,
                    ReportStatus.failure,
                    ReportMessages.reportFailure(digitalObject.id.labelString)
                )
Jonas Waeber's avatar
Jonas Waeber committed
172
173
174
175
            )
        }
    }
}