In order to mitigate against the brute force attacks against Gitlab accounts, we are moving to all edu-ID Logins. We would like to remind you to link your account with your edu-id. Login will be possible only by edu-ID after December 31, 2021. Here you can find the instructions for linking your account.

If you don't have a SWITCH edu-ID, you can create one with this guide here

kind regards

KafkaTopology.kt 9.41 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * search-doc-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
23
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
24
25
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
26
import java.io.StringWriter
27
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
28
29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
30
31
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.memobase.helpers.Constants.SettingsProps
34
import org.memobase.helpers.ElasticSearchWrapper
Jonas Waeber's avatar
Jonas Waeber committed
35
import org.memobase.helpers.JsonUtility
36
import org.memobase.helpers.TranslationMappers
37
import org.memobase.helpers.UpdateQueryBuilder
38
import org.memobase.model.DocumentsSearchDoc
39
40
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
41
import org.memobase.model.Schema
42
import org.memobase.model.UpdateQuery
Jonas Waeber's avatar
Jonas Waeber committed
43

44
45
46
47
48
class KafkaTopology(
    private val settings: SettingsLoader,
    translationMappers: TranslationMappers,
    elasticSearchWrapper: ElasticSearchWrapper
) {
Jonas Waeber's avatar
Jonas Waeber committed
49
50
    private val log = LogManager.getLogger("SearchDocService")

51
    private val appSettings = settings.appSettings
52
    private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
53
    private val updateTopic = appSettings.getProperty(SettingsProps.updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
54
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
55

56
    private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, elasticSearchWrapper, mediaUrl)
57
    private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
58

59
60
    private val updateQueryBuilder = UpdateQueryBuilder()

61
    private val recordSetSearchDocBuilder =
62
        RecordSetSearchDocBuilder(elasticSearchWrapper)
63

64
65
    private val jsonWriter = ObjectMapper().registerKotlinModule().writer()

Jonas Waeber's avatar
Jonas Waeber committed
66
67
68
    fun build(): Topology {
        val builder = StreamsBuilder()
        val stream = builder.stream<String, String>(settings.inputTopic)
69
        val branchedStream = stream
Jonas Waeber's avatar
Jonas Waeber committed
70
            .mapValues { value -> JsonUtility.parse(value) }
Jonas Waeber's avatar
Jonas Waeber committed
71
            // TODO: Add reporting for this.
72
            .filter { _, value -> value.isNotEmpty() }
Jonas Waeber's avatar
Jonas Waeber committed
73
            .mapValues { value -> JsonUtility.unpack(value) }
74
            .map { key, value -> KeyValue(key.substringAfterLast("/"), value) }
75
            .branch(
Jonas Waeber's avatar
Jonas Waeber committed
76
77
78
                Predicate { _, value -> value.containsKey(JsonUtility.recordTag) },
                Predicate { _, value -> value.containsKey(JsonUtility.institutionTag) },
                Predicate { _, value -> value.containsKey(JsonUtility.recordSetTag) },
79
80
                Predicate { _, _ -> true }
            )
81
82

        val recordStream = branchedStream[0]
83
84
85
86
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        documentSearchDocBuilder.transform(readOnlyKey, value),
Jonas Waeber's avatar
Jonas Waeber committed
87
88
89
90
91
92
                        Report(
                            readOnlyKey,
                            ReportStatus.success,
                            "Transformed message into search doc.",
                            Service.nameRecords
                        )
93
94
95
96
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        DocumentsSearchDoc.DEFAULT,
Jonas Waeber's avatar
Jonas Waeber committed
97
                        Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.nameRecords)
98
                    )
99
                }
100
            }
101
102
103
104

        outputStreams(recordStream)

        val institutionStream = branchedStream[1]
105
106
107
108
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        institutionSearchDoc.transform(readOnlyKey, value),
Jonas Waeber's avatar
Jonas Waeber committed
109
110
111
112
113
114
                        Report(
                            readOnlyKey,
                            ReportStatus.success,
                            "Transformed message into search doc.",
                            Service.nameInstitutions
                        )
115
116
117
118
119
120
121
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        InstitutionSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
Jonas Waeber's avatar
Jonas Waeber committed
122
                            Service.nameInstitutions
123
                        )
124
                    )
Jonas Waeber's avatar
Jonas Waeber committed
125
                }
126
            }
127
        outputStreams(institutionStream)
128

129
        val recordSetStream = branchedStream[2]
130
131
132
133
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        recordSetSearchDocBuilder.transform(readOnlyKey, value),
Jonas Waeber's avatar
Jonas Waeber committed
134
135
136
137
138
139
                        Report(
                            readOnlyKey,
                            ReportStatus.success,
                            "Transformed message into search doc.",
                            Service.nameRecordSets
                        )
140
141
142
143
144
145
146
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        RecordSetSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
Jonas Waeber's avatar
Jonas Waeber committed
147
                            Service.nameRecordSets
148
                        )
149
                    )
150
                }
151
            }
152
153
        outputStreams(recordSetStream)

154
        branchedStream[3]
155
156
157
158
159
            .mapValues { readOnlyKey, value ->
                Report(
                    readOnlyKey,
                    ReportStatus.fatal,
                    "No record, memobase institution or record set present in input data: $value.",
Jonas Waeber's avatar
Jonas Waeber committed
160
                    Service.nameRecords
161
162
163
                )
            }
            .to(reportTopic)
164
165
166
167
168
        return builder.build()
    }

    private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
        stream
169
170
            .mapValues { value -> value.second.toJson() }
            .to(reportTopic)
Jonas Waeber's avatar
Jonas Waeber committed
171

172
        val schema = stream
173
174
            .filterNot { _, value -> value.second.status == ReportStatus.fatal }
            .mapValues { value -> value.first }
175
176

        schema
177
178
179
180
181
182
            .mapValues { value ->
                val out = StringWriter()
                jsonWriter.writeValue(out, value)
                out.toString()
            }
            .to(settings.outputTopic)
183
184
185
186
187
188
189
190
191
192
193
194
195

        // generate update messages for institution & record set names!
        schema
            .map { key, value ->
                when (value) {
                    is RecordSetSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateRecordSetName(value.id, value.name)
                    )
                    else -> KeyValue(key, null)
                }
            }
            .filter { _, value -> value != null }
Jonas Waeber's avatar
Jonas Waeber committed
196
            .mapValues { value -> JsonUtility.queryToJson(value!!) }
197
            .to(updateTopic)
198

199
200
201
        recordSetUpdate(schema, "masterInstitution")
        recordSetUpdate(schema, "originalInstitution")
        recordSetUpdate(schema, "accessInstitution")
202
        recordSetUpdate(schema, "institution")
203
204
205
206
207
    }


    private fun recordSetUpdate(stream: KStream<String, Schema>, targetField: String) {
        stream.map { key, value ->
208
209
210
            when (value) {
                is RecordSetSearchDoc -> KeyValue(
                    "$key#update",
211
                    updateInstitutionContainer(value, targetField)
212
213
214
215
216
                )
                else -> KeyValue(key, null)
            }
        }
            .filter { _, value -> value != null }
Jonas Waeber's avatar
Jonas Waeber committed
217
            .mapValues { value -> JsonUtility.queryToJson(value!!) }
218
            .to(updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
219
    }
220
221
222

    private fun updateInstitutionContainer(recordSet: RecordSetSearchDoc, targetField: String): UpdateQuery {
        return when (targetField) {
223
224
225
226
227
            "institution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField,
                recordSet.institution
            )
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
            "masterInstitution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField,
                recordSet.masterInstitution
            )
            "originalInstitution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField, recordSet.originalInstitution
            )
            "accessInstitution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField,
                recordSet.accessInstitution
            )
            else -> throw Exception("Unknown institution type. Set the wrong constant somewhere...")
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
245
}