In order to mitigate against the brute force attacks against Gitlab accounts, we are moving to all edu-ID Logins. We would like to remind you to link your account with your edu-id. Login will be possible only by edu-ID after November 30, 2021. Here you can find the instructions for linking your account.

If you don't have a SWITCH edu-ID, you can create one with this guide here

kind regards

This Server has been upgraded to GitLab release 14.2.6

KafkaTopology.kt 7.71 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * search-doc-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
23
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
24
25
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
26
import java.io.StringWriter
27
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
28
29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
30
31
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.logging.log4j.LogManager
33
import org.memobase.helpers.ElasticSearchWrapper
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.memobase.helpers.JSON
35
import org.memobase.helpers.KEYS.SettingsProps
36
import org.memobase.helpers.TranslationMappers
37
import org.memobase.helpers.UpdateQueryBuilder
38
import org.memobase.model.DocumentsSearchDoc
39
40
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
41
import org.memobase.model.Schema
Jonas Waeber's avatar
Jonas Waeber committed
42

43
44
45
46
47
class KafkaTopology(
    private val settings: SettingsLoader,
    translationMappers: TranslationMappers,
    elasticSearchWrapper: ElasticSearchWrapper
) {
Jonas Waeber's avatar
Jonas Waeber committed
48
49
    private val log = LogManager.getLogger("SearchDocService")

50
    private val appSettings = settings.appSettings
51
    private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
52
    private val updateTopic = appSettings.getProperty(SettingsProps.updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
53
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
54

55
56
    private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, mediaUrl)
    private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
57

58
59
    private val updateQueryBuilder = UpdateQueryBuilder()

60
    private val recordSetSearchDocBuilder =
61
        RecordSetSearchDocBuilder(elasticSearchWrapper)
62

63
64
    private val jsonWriter = ObjectMapper().registerKotlinModule().writer()

Jonas Waeber's avatar
Jonas Waeber committed
65
66
67
    fun build(): Topology {
        val builder = StreamsBuilder()
        val stream = builder.stream<String, String>(settings.inputTopic)
68
        val branchedStream = stream
69
70
71
            .mapValues { value -> JSON.parse(value) }
            .filter { _, value -> value.isNotEmpty() }
            .mapValues { value -> JSON.unpack(value) }
72
            .map { key, value -> KeyValue(key.substringAfterLast("/"), value) }
73
74
75
76
77
78
            .branch(
                Predicate { _, value -> value.containsKey(JSON.recordTag) },
                Predicate { _, value -> value.containsKey(JSON.institutionTag) },
                Predicate { _, value -> value.containsKey(JSON.recordSetTag) },
                Predicate { _, _ -> true }
            )
79
80

        val recordStream = branchedStream[0]
81
82
83
84
85
86
87
88
89
90
91
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        documentSearchDocBuilder.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        DocumentsSearchDoc.DEFAULT,
                        Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name)
                    )
92
                }
93
            }
94
95
96
97

        outputStreams(recordStream)

        val institutionStream = branchedStream[1]
98
99
100
101
102
103
104
105
106
107
108
109
110
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        institutionSearchDoc.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        InstitutionSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
                            Service.name
111
                        )
112
                    )
Jonas Waeber's avatar
Jonas Waeber committed
113
                }
114
            }
115
        outputStreams(institutionStream)
116

117
        val recordSetStream = branchedStream[2]
118
119
120
121
122
123
124
125
126
127
128
129
130
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        recordSetSearchDocBuilder.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        RecordSetSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
                            Service.name
131
                        )
132
                    )
133
                }
134
            }
135
136
        outputStreams(recordSetStream)

137
        branchedStream[3]
138
139
140
141
142
143
144
145
146
            .mapValues { readOnlyKey, value ->
                Report(
                    readOnlyKey,
                    ReportStatus.fatal,
                    "No record, memobase institution or record set present in input data: $value.",
                    Service.name
                )
            }
            .to(reportTopic)
147
148
149
150
151
        return builder.build()
    }

    private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
        stream
152
153
            .mapValues { value -> value.second.toJson() }
            .to(reportTopic)
Jonas Waeber's avatar
Jonas Waeber committed
154

155
        val schema = stream
156
157
            .filterNot { _, value -> value.second.status == ReportStatus.fatal }
            .mapValues { value -> value.first }
158
159

        schema
160
161
162
163
164
165
            .mapValues { value ->
                val out = StringWriter()
                jsonWriter.writeValue(out, value)
                out.toString()
            }
            .to(settings.outputTopic)
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

        // generate update messages for institution & record set names!
        schema
            .map { key, value ->
                when (value) {
                    is InstitutionSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateInstitutionName(value.id, value.name)
                    )
                    is RecordSetSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateRecordSetName(value.id, value.name)
                    )
                    else -> KeyValue(key, null)
                }
            }
            .filter { _, value -> value != null }
            .mapValues { value -> JSON.queryToJson(value!!) }
            .to(updateTopic)
185
186
187
188
189
190
191
192
193
194
195
196
197

        schema.map { key, value ->
            when (value) {
                is RecordSetSearchDoc -> KeyValue(
                    "$key#update",
                    updateQueryBuilder.updateAccessInstitutionContainer(value.id, value.accessInstitution)
                )
                else -> KeyValue(key, null)
            }
        }
            .filter { _, value -> value != null }
            .mapValues { value -> JSON.queryToJson(value!!) }
            .to(updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
198
199
    }
}