KafkaTopology.kt 12.5 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * search-doc-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
23
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
24
25
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
26
import java.io.StringWriter
27
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
28
29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
30
31
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.memobase.helpers.Constants.SettingsProps
34
import org.memobase.helpers.ElasticSearchWrapper
Jonas Waeber's avatar
Jonas Waeber committed
35
import org.memobase.helpers.JsonUtility
36
import org.memobase.helpers.TranslationMappers
37
import org.memobase.helpers.UpdateQueryBuilder
38
import org.memobase.model.DocumentsSearchDoc
39
40
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
41
import org.memobase.model.Schema
42
import org.memobase.model.UpdateQuery
Jonas Waeber's avatar
Jonas Waeber committed
43

44
45
46
class KafkaTopology(
    private val settings: SettingsLoader,
    translationMappers: TranslationMappers,
47
    private val elasticSearchWrapper: ElasticSearchWrapper
48
) {
Jonas Waeber's avatar
Jonas Waeber committed
49
50
    private val log = LogManager.getLogger("SearchDocService")

51
    private val appSettings = settings.appSettings
52
    private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
53
    private val updateTopic = appSettings.getProperty(SettingsProps.updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
54
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
55

56
    private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, elasticSearchWrapper, mediaUrl)
57
    private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
58

59
60
    private val updateQueryBuilder = UpdateQueryBuilder()

61
    private val recordSetSearchDocBuilder =
62
        RecordSetSearchDocBuilder(elasticSearchWrapper)
63

64
65
    private val jsonWriter = ObjectMapper().registerKotlinModule().writer()

Jonas Waeber's avatar
Jonas Waeber committed
66
67
68
    fun build(): Topology {
        val builder = StreamsBuilder()
        val stream = builder.stream<String, String>(settings.inputTopic)
69
        val branchedStream = stream
Jonas Waeber's avatar
Jonas Waeber committed
70
            .mapValues { value -> JsonUtility.parse(value) }
Jonas Waeber's avatar
Jonas Waeber committed
71
            // TODO: Add reporting for this.
72
            .filter { _, value -> value.isNotEmpty() }
Jonas Waeber's avatar
Jonas Waeber committed
73
            .mapValues { value -> JsonUtility.unpack(value) }
74
            .map { key, value -> KeyValue(key.substringAfterLast("/"), value) }
75
            .branch(
Jonas Waeber's avatar
Jonas Waeber committed
76
77
78
                Predicate { _, value -> value.containsKey(JsonUtility.recordTag) },
                Predicate { _, value -> value.containsKey(JsonUtility.institutionTag) },
                Predicate { _, value -> value.containsKey(JsonUtility.recordSetTag) },
79
80
                Predicate { _, _ -> true }
            )
81
82

        val recordStream = branchedStream[0]
83
84
85
86
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        documentSearchDocBuilder.transform(readOnlyKey, value),
Jonas Waeber's avatar
Jonas Waeber committed
87
88
89
                        Report(
                            readOnlyKey,
                            ReportStatus.success,
Jonas Waeber's avatar
Jonas Waeber committed
90
                            "",
Jonas Waeber's avatar
Jonas Waeber committed
91
92
                            Service.nameRecords
                        )
93
94
95
96
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        DocumentsSearchDoc.DEFAULT,
Jonas Waeber's avatar
Jonas Waeber committed
97
                        Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.nameRecords)
98
                    )
99
                }
100
            }
101
102
103
104

        outputStreams(recordStream)

        val institutionStream = branchedStream[1]
105
106
107
108
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        institutionSearchDoc.transform(readOnlyKey, value),
Jonas Waeber's avatar
Jonas Waeber committed
109
110
111
                        Report(
                            readOnlyKey,
                            ReportStatus.success,
Jonas Waeber's avatar
Jonas Waeber committed
112
                            "",
Jonas Waeber's avatar
Jonas Waeber committed
113
114
                            Service.nameInstitutions
                        )
115
116
117
118
119
120
121
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        InstitutionSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
Jonas Waeber's avatar
Jonas Waeber committed
122
                            Service.nameInstitutions
123
                        )
124
                    )
Jonas Waeber's avatar
Jonas Waeber committed
125
                }
126
            }
127
        outputStreams(institutionStream)
128

129
        val recordSetStream = branchedStream[2]
130
131
132
133
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        recordSetSearchDocBuilder.transform(readOnlyKey, value),
Jonas Waeber's avatar
Jonas Waeber committed
134
135
136
                        Report(
                            readOnlyKey,
                            ReportStatus.success,
Jonas Waeber's avatar
Jonas Waeber committed
137
                            "",
Jonas Waeber's avatar
Jonas Waeber committed
138
139
                            Service.nameRecordSets
                        )
140
141
142
143
144
145
146
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        RecordSetSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
Jonas Waeber's avatar
Jonas Waeber committed
147
                            Service.nameRecordSets
148
                        )
149
                    )
150
                }
151
            }
152
153
        outputStreams(recordSetStream)

154
        branchedStream[3]
155
156
157
158
159
            .mapValues { readOnlyKey, value ->
                Report(
                    readOnlyKey,
                    ReportStatus.fatal,
                    "No record, memobase institution or record set present in input data: $value.",
Jonas Waeber's avatar
Jonas Waeber committed
160
                    Service.nameRecords
161
162
163
                )
            }
            .to(reportTopic)
164
165
166
167
168
        return builder.build()
    }

    private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
        stream
169
170
            .mapValues { value -> value.second.toJson() }
            .to(reportTopic)
Jonas Waeber's avatar
Jonas Waeber committed
171

172
        val schema = stream
173
174
            .filterNot { _, value -> value.second.status == ReportStatus.fatal }
            .mapValues { value -> value.first }
175
176

        schema
177
178
179
180
181
182
            .mapValues { value ->
                val out = StringWriter()
                jsonWriter.writeValue(out, value)
                out.toString()
            }
            .to(settings.outputTopic)
183
184
185
186
187

        // generate update messages for institution & record set names!
        schema
            .map { key, value ->
                when (value) {
188
189
190
                    is RecordSetSearchDoc -> {
                        // Do not update if the name is already the same.
                        if (value.name == elasticSearchWrapper.getRecordSetName(value.id)) {
Jonas Waeber's avatar
Jonas Waeber committed
191
                            log.info("No update for record set name for ${value.name} as they are already current.")
192
193
                            KeyValue(key, null)
                        } else {
Jonas Waeber's avatar
Jonas Waeber committed
194
                            log.info("Updating names for record set ${value.id} in documents.")
195
196
197
198
199
200
                            KeyValue(
                                "$key#update",
                                updateQueryBuilder.updateRecordSetName(value.id, value.name)
                            )
                        }
                    }
201
202
203
204
                    else -> KeyValue(key, null)
                }
            }
            .filter { _, value -> value != null }
Jonas Waeber's avatar
Jonas Waeber committed
205
            .mapValues { value -> JsonUtility.queryToJson(value!!) }
206
            .to(updateTopic)
207

208
209
210
        recordSetUpdate(schema, "masterInstitution")
        recordSetUpdate(schema, "originalInstitution")
        recordSetUpdate(schema, "accessInstitution")
211
        recordSetUpdate(schema, "institution")
212
213
214
215
216
    }


    private fun recordSetUpdate(stream: KStream<String, Schema>, targetField: String) {
        stream.map { key, value ->
217
            when (value) {
218
219
220
221
                is RecordSetSearchDoc -> {
                    val update =
                        updateInstitutionContainer(value, targetField)
                    if (update == null) {
Jonas Waeber's avatar
Jonas Waeber committed
222
                        log.info("No update for $targetField for ${value.id} as they are already current.")
223
224
                        KeyValue(key, null)
                    } else {
Jonas Waeber's avatar
Jonas Waeber committed
225
                        log.info("Updating field $targetField for ${value.id} with ${value.name}.")
226
227
228
229
230
231
                        KeyValue(
                            "$key#update",
                            update
                        )
                    }
                }
232
233
234
235
                else -> KeyValue(key, null)
            }
        }
            .filter { _, value -> value != null }
Jonas Waeber's avatar
Jonas Waeber committed
236
            .mapValues { value -> JsonUtility.queryToJson(value!!) }
237
            .to(updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
238
    }
239

240
    private fun updateInstitutionContainer(recordSet: RecordSetSearchDoc, targetField: String): UpdateQuery? {
241
        return when (targetField) {
242
243
244
245
246
            "institution" -> {
                if (recordSet.institution.containsAll(
                        elasticSearchWrapper.getExtraInstitutionsFromRecordSet(recordSet.id, targetField)
                    )
                ) {
Jonas Waeber's avatar
Jonas Waeber committed
247
                    log.info("No update for $targetField for ${recordSet.id} as they are already current.")
248
249
                    null
                } else {
Jonas Waeber's avatar
Jonas Waeber committed
250
                    log.info("Updating field $targetField for ${recordSet.id} with ${recordSet.institution}.")
251
252
253
254
255
256
257
258
259
260
261
262
                    updateQueryBuilder.updateInstitutionContainers(
                        recordSet.id,
                        targetField,
                        recordSet.institution
                    )
                }
            }
            "masterInstitution" -> {
                if (recordSet.masterInstitution.containsAll(
                        elasticSearchWrapper.getExtraInstitutionsFromRecordSet(recordSet.id, targetField)
                    )
                ) {
Jonas Waeber's avatar
Jonas Waeber committed
263
                    log.info("No update for $targetField for ${recordSet.id} as they are already current.")
264
265
                    null
                } else {
Jonas Waeber's avatar
Jonas Waeber committed
266
                    log.info("Updating field $targetField for ${recordSet.id} with ${recordSet.masterInstitution}.")
267
268
269
270
271
272
273
274
275
276
277
278
                    updateQueryBuilder.updateInstitutionContainers(
                        recordSet.id,
                        targetField,
                        recordSet.masterInstitution
                    )
                }
            }
            "originalInstitution" -> {
                if (recordSet.originalInstitution.containsAll(
                        elasticSearchWrapper.getExtraInstitutionsFromRecordSet(recordSet.id, targetField)
                    )
                ) {
Jonas Waeber's avatar
Jonas Waeber committed
279
                    log.info("No update for $targetField for ${recordSet.id} as they are already current.")
280
281
                    null
                } else {
Jonas Waeber's avatar
Jonas Waeber committed
282
                    log.info("Updating field $targetField for ${recordSet.id} with ${recordSet.originalInstitution}.")
283
284
285
286
287
288
289
290
291
292
293
                    updateQueryBuilder.updateInstitutionContainers(
                        recordSet.id,
                        targetField, recordSet.originalInstitution
                    )
                }
            }
            "accessInstitution" -> {
                if (recordSet.accessInstitution.containsAll(
                        elasticSearchWrapper.getExtraInstitutionsFromRecordSet(recordSet.id, targetField)
                    )
                ) {
Jonas Waeber's avatar
Jonas Waeber committed
294
                    log.info("No update for $targetField for ${recordSet.id} as they are already current.")
295
296
                    null
                } else {
Jonas Waeber's avatar
Jonas Waeber committed
297
                    log.info("Updating field $targetField for ${recordSet.id} with ${recordSet.accessInstitution}.")
298
299
300
301
302
303
304
                    updateQueryBuilder.updateInstitutionContainers(
                        recordSet.id,
                        targetField,
                        recordSet.accessInstitution
                    )
                }
            }
305
306
307
            else -> throw Exception("Unknown institution type. Set the wrong constant somewhere...")
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
308
}