KafkaTopology.kt 6.5 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * Media Linker
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
22
import java.io.StringReader
Jonas Waeber's avatar
Jonas Waeber committed
23
24
25
26
27
import java.io.StringWriter
import kotlin.system.exitProcess
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
Jonas Waeber's avatar
Jonas Waeber committed
28
import org.apache.jena.rdf.model.Statement
Jonas Waeber's avatar
Jonas Waeber committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RICO
import org.memobase.reports.ReportMessages
import org.memobase.reports.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient

class KafkaTopology(private val settings: SettingsLoader) {
    private val log = LogManager.getLogger("MediaLinker")

    private val sftpClient = SftpClient(settings.sftpSettings)
Jonas Waeber's avatar
Jonas Waeber committed
45
    private val sftpBasePath = settings.appSettings.getProperty("sftp.basePath")
Jonas Waeber's avatar
Jonas Waeber committed
46
47
48
49
50

    private val files = createMediaFileList(settings.appSettings.getProperty("recordSetId"))

    private fun createMediaFileList(setting: String): List<String> {
        return try {
Jonas Waeber's avatar
Jonas Waeber committed
51
52
53
            val list = sftpClient.listFiles("$sftpBasePath/$setting/media")
            log.info("Files found on sftp server: $list.")
            list
Jonas Waeber's avatar
Jonas Waeber committed
54
55
        } catch (ex: Exception) {
            ex.printStackTrace()
Jonas Waeber's avatar
Jonas Waeber committed
56
            log.error(ex.localizedMessage + ": $sftpBasePath/$setting/media")
Jonas Waeber's avatar
Jonas Waeber committed
57
58
59
60
61
62
63
64
65
            exitProcess(1)
        }
    }

    private val reportingTopic = settings.outputTopic + "-reporting"

    fun build(): Topology {
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
66
        val stream = builder.stream<String, String>(settings.inputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
67
68

        val instantiationBranch = stream
Jonas Waeber's avatar
Jonas Waeber committed
69
70
71
72
73
74
                .mapValues { value -> createModel(value) }
                .mapValues { value -> extractSubjects(value) }
                .branch(
                        Predicate { _, value -> isDigitalObjectWithoutLocator(value.second) },
                        Predicate { _, _ -> true }
                )
Jonas Waeber's avatar
Jonas Waeber committed
75
76

        val updateDigitalObjects = instantiationBranch[0]
Jonas Waeber's avatar
Jonas Waeber committed
77
78
79
80
81
82
                .mapValues { value -> enrichSftpLocator(value) }
                .mapValues { value ->
                    val out = StringWriter()
                    value.first.write(out, "NTRIPLES")
                    out.toString().trim()
                }
Jonas Waeber's avatar
Jonas Waeber committed
83
84

        updateDigitalObjects
Jonas Waeber's avatar
Jonas Waeber committed
85
                .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
86
87

        updateDigitalObjects
Jonas Waeber's avatar
Jonas Waeber committed
88
89
90
91
92
93
94
                .mapValues { readOnlyKey, _ ->
                    Klaxon().toJsonString(
                            Report(
                                    readOnlyKey,
                                    ReportStatus.success,
                                    "Added sftp locator link to digital resource."
                            )
Jonas Waeber's avatar
Jonas Waeber committed
95
                    )
Jonas Waeber's avatar
Jonas Waeber committed
96
97
                }
                .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
98
99

        val unchangedWrittenResources = instantiationBranch[1]
Jonas Waeber's avatar
Jonas Waeber committed
100
101
102
103
104
                .mapValues { value ->
                    val out = StringWriter()
                    value.first.write(out, "NTRIPLES")
                    out.toString().trim()
                }
Jonas Waeber's avatar
Jonas Waeber committed
105
106

        unchangedWrittenResources
Jonas Waeber's avatar
Jonas Waeber committed
107
                .to(settings.outputTopic)
Jonas Waeber's avatar
Jonas Waeber committed
108
109

        unchangedWrittenResources
Jonas Waeber's avatar
Jonas Waeber committed
110
111
                .mapValues { key, _ -> Klaxon().toJsonString(Report(key, ReportStatus.success, "No changes made.")) }
                .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
112
113
114
115

        return builder.build()
    }

Jonas Waeber's avatar
Jonas Waeber committed
116
117
    private fun extractSubjects(model: Model): Pair<Model, List<Resource>> {
        return Pair(model, model.listSubjects().toList())
Jonas Waeber's avatar
Jonas Waeber committed
118
119
    }

Jonas Waeber's avatar
Jonas Waeber committed
120
121
    private fun isDigitalObjectWithoutLocator(res: List<Resource>): Boolean {
        return res.any { it.hasProperty(RICO.type, "digitalObject") && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
122
123
    }

Jonas Waeber's avatar
Jonas Waeber committed
124
    private fun createModel(data: String): Model {
Jonas Waeber's avatar
Jonas Waeber committed
125
        val model = ModelFactory.createDefaultModel()
Jonas Waeber's avatar
Jonas Waeber committed
126
        model.read(StringReader(data), "", "NTRIPLES")
Jonas Waeber's avatar
Jonas Waeber committed
127
128
129
        return model
    }

Jonas Waeber's avatar
Jonas Waeber committed
130
    private fun enrichSftpLocator(data: Pair<Model, List<Resource>>): Pair<Model, Report> {
Jonas Waeber's avatar
Jonas Waeber committed
131
        var link = ""
Jonas Waeber's avatar
Jonas Waeber committed
132
        val digitalObject = data.second.first { it.hasProperty(RICO.type, "digitalObject") && !it.hasProperty(EBUCORE.locator) }
Jonas Waeber's avatar
Jonas Waeber committed
133
        val addedStatements = mutableListOf<Statement>()
Jonas Waeber's avatar
Jonas Waeber committed
134
        for (stmt in digitalObject.listProperties(RICO.identifiedBy)) {
Jonas Waeber's avatar
Jonas Waeber committed
135
136
137
138
139
140
141
            if (stmt != null) {
                if (stmt.`object`.isResource) {
                    val resource = stmt.`object`.asResource()
                    if (resource.hasLiteral(RICO.type, "original")) {
                        val value = resource.getProperty(RICO.identifier).literal
                        for (file in files) {
                            if (file.contains(value.string)) {
142
                                link = "sftp:$file"
Jonas Waeber's avatar
Jonas Waeber committed
143
                                val literal = data.first.createLiteral(link)
Jonas Waeber's avatar
Jonas Waeber committed
144
                                addedStatements.add(StatementImpl(digitalObject, EBUCORE.locator, literal))
Jonas Waeber's avatar
Jonas Waeber committed
145
146
147
148
149
150
                            }
                        }
                    }
                }
            }
        }
Jonas Waeber's avatar
Jonas Waeber committed
151
152
        if (addedStatements.size > 0) {
            data.first.add(addedStatements)
Jonas Waeber's avatar
Jonas Waeber committed
153
            return Pair(
Jonas Waeber's avatar
Jonas Waeber committed
154
155
156
157
158
159
                    data.first,
                    Report(
                            digitalObject.uri,
                            ReportStatus.success,
                            ReportMessages.reportSuccess(digitalObject.uri, link)
                    )
Jonas Waeber's avatar
Jonas Waeber committed
160
161
162
            )
        } else {
            return Pair(
Jonas Waeber's avatar
Jonas Waeber committed
163
164
165
166
167
168
                    data.first,
                    Report(
                            digitalObject.id.labelString,
                            ReportStatus.failure,
                            ReportMessages.reportFailure(digitalObject.id.labelString)
                    )
Jonas Waeber's avatar
Jonas Waeber committed
169
170
171
172
            )
        }
    }
}