KafkaTopology.kt 8.79 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * search-doc-service
Jonas Waeber's avatar
Jonas Waeber committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21 22 23
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
24 25
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
26
import java.io.StringWriter
27
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
28 29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
30 31
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.logging.log4j.LogManager
33
import org.memobase.helpers.ElasticSearchWrapper
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.memobase.helpers.JSON
35
import org.memobase.helpers.KEYS.SettingsProps
36
import org.memobase.helpers.TranslationMappers
37
import org.memobase.helpers.UpdateQueryBuilder
38
import org.memobase.model.DocumentsSearchDoc
39 40
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
41
import org.memobase.model.Schema
42
import org.memobase.model.UpdateQuery
Jonas Waeber's avatar
Jonas Waeber committed
43

44 45 46 47 48
class KafkaTopology(
    private val settings: SettingsLoader,
    translationMappers: TranslationMappers,
    elasticSearchWrapper: ElasticSearchWrapper
) {
Jonas Waeber's avatar
Jonas Waeber committed
49 50
    private val log = LogManager.getLogger("SearchDocService")

51
    private val appSettings = settings.appSettings
52
    private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
53
    private val updateTopic = appSettings.getProperty(SettingsProps.updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
54
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
55

56 57
    private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, mediaUrl)
    private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
58

59 60
    private val updateQueryBuilder = UpdateQueryBuilder()

61
    private val recordSetSearchDocBuilder =
62
        RecordSetSearchDocBuilder(elasticSearchWrapper)
63

64 65
    private val jsonWriter = ObjectMapper().registerKotlinModule().writer()

Jonas Waeber's avatar
Jonas Waeber committed
66 67 68
    fun build(): Topology {
        val builder = StreamsBuilder()
        val stream = builder.stream<String, String>(settings.inputTopic)
69
        val branchedStream = stream
70 71 72
            .mapValues { value -> JSON.parse(value) }
            .filter { _, value -> value.isNotEmpty() }
            .mapValues { value -> JSON.unpack(value) }
73
            .map { key, value -> KeyValue(key.substringAfterLast("/"), value) }
74 75 76 77 78 79
            .branch(
                Predicate { _, value -> value.containsKey(JSON.recordTag) },
                Predicate { _, value -> value.containsKey(JSON.institutionTag) },
                Predicate { _, value -> value.containsKey(JSON.recordSetTag) },
                Predicate { _, _ -> true }
            )
80 81

        val recordStream = branchedStream[0]
82 83 84 85 86 87 88 89 90 91 92
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        documentSearchDocBuilder.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        DocumentsSearchDoc.DEFAULT,
                        Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name)
                    )
93
                }
94
            }
95 96 97 98

        outputStreams(recordStream)

        val institutionStream = branchedStream[1]
99 100 101 102 103 104 105 106 107 108 109 110 111
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        institutionSearchDoc.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        InstitutionSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
                            Service.name
112
                        )
113
                    )
Jonas Waeber's avatar
Jonas Waeber committed
114
                }
115
            }
116
        outputStreams(institutionStream)
117

118
        val recordSetStream = branchedStream[2]
119 120 121 122 123 124 125 126 127 128 129 130 131
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        recordSetSearchDocBuilder.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        RecordSetSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
                            Service.name
132
                        )
133
                    )
134
                }
135
            }
136 137
        outputStreams(recordSetStream)

138
        branchedStream[3]
139 140 141 142 143 144 145 146 147
            .mapValues { readOnlyKey, value ->
                Report(
                    readOnlyKey,
                    ReportStatus.fatal,
                    "No record, memobase institution or record set present in input data: $value.",
                    Service.name
                )
            }
            .to(reportTopic)
148 149 150 151 152
        return builder.build()
    }

    private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
        stream
153 154
            .mapValues { value -> value.second.toJson() }
            .to(reportTopic)
Jonas Waeber's avatar
Jonas Waeber committed
155

156
        val schema = stream
157 158
            .filterNot { _, value -> value.second.status == ReportStatus.fatal }
            .mapValues { value -> value.first }
159 160

        schema
161 162 163 164 165 166
            .mapValues { value ->
                val out = StringWriter()
                jsonWriter.writeValue(out, value)
                out.toString()
            }
            .to(settings.outputTopic)
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185

        // generate update messages for institution & record set names!
        schema
            .map { key, value ->
                when (value) {
                    is InstitutionSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateInstitutionName(value.id, value.name)
                    )
                    is RecordSetSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateRecordSetName(value.id, value.name)
                    )
                    else -> KeyValue(key, null)
                }
            }
            .filter { _, value -> value != null }
            .mapValues { value -> JSON.queryToJson(value!!) }
            .to(updateTopic)
186

187 188 189 190 191 192 193 194 195

        recordSetUpdate(schema, "masterInstitution")
        recordSetUpdate(schema, "originalInstitution")
        recordSetUpdate(schema, "accessInstitution")
    }


    private fun recordSetUpdate(stream: KStream<String, Schema>, targetField: String) {
        stream.map { key, value ->
196 197 198
            when (value) {
                is RecordSetSearchDoc -> KeyValue(
                    "$key#update",
199
                    updateInstitutionContainer(value, targetField)
200 201 202 203 204 205 206
                )
                else -> KeyValue(key, null)
            }
        }
            .filter { _, value -> value != null }
            .mapValues { value -> JSON.queryToJson(value!!) }
            .to(updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
207
    }
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227

    private fun updateInstitutionContainer(recordSet: RecordSetSearchDoc, targetField: String): UpdateQuery {
        return when (targetField) {
            "masterInstitution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField,
                recordSet.masterInstitution
            )
            "originalInstitution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField, recordSet.originalInstitution
            )
            "accessInstitution" -> updateQueryBuilder.updateInstitutionContainers(
                recordSet.id,
                targetField,
                recordSet.accessInstitution
            )
            else -> throw Exception("Unknown institution type. Set the wrong constant somewhere...")
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
228
}