KafkaTopology.kt 7.71 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * search-doc-service
Jonas Waeber's avatar
Jonas Waeber committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21 22 23
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
24 25
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
26
import java.io.StringWriter
27
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
28 29
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
30 31
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.logging.log4j.LogManager
33
import org.memobase.helpers.ElasticSearchWrapper
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.memobase.helpers.JSON
35
import org.memobase.helpers.KEYS.SettingsProps
36
import org.memobase.helpers.TranslationMappers
37
import org.memobase.helpers.UpdateQueryBuilder
38
import org.memobase.model.DocumentsSearchDoc
39 40
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
41
import org.memobase.model.Schema
Jonas Waeber's avatar
Jonas Waeber committed
42

43 44 45 46 47
class KafkaTopology(
    private val settings: SettingsLoader,
    translationMappers: TranslationMappers,
    elasticSearchWrapper: ElasticSearchWrapper
) {
Jonas Waeber's avatar
Jonas Waeber committed
48 49
    private val log = LogManager.getLogger("SearchDocService")

50
    private val appSettings = settings.appSettings
51
    private val mediaUrl = appSettings.getProperty(SettingsProps.mediaUrl)
52
    private val updateTopic = appSettings.getProperty(SettingsProps.updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
53
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
54

55 56
    private val documentSearchDocBuilder = DocumentsSearchDocBuilder(translationMappers, mediaUrl)
    private val institutionSearchDoc = InstitutionSearchDocBuilder(translationMappers, elasticSearchWrapper)
57

58 59
    private val updateQueryBuilder = UpdateQueryBuilder()

60
    private val recordSetSearchDocBuilder =
61
        RecordSetSearchDocBuilder(elasticSearchWrapper)
62

63 64
    private val jsonWriter = ObjectMapper().registerKotlinModule().writer()

Jonas Waeber's avatar
Jonas Waeber committed
65 66 67
    fun build(): Topology {
        val builder = StreamsBuilder()
        val stream = builder.stream<String, String>(settings.inputTopic)
68
        val branchedStream = stream
69 70 71
            .mapValues { value -> JSON.parse(value) }
            .filter { _, value -> value.isNotEmpty() }
            .mapValues { value -> JSON.unpack(value) }
72
            .map { key, value -> KeyValue(key.substringAfterLast("/"), value) }
73 74 75 76 77 78
            .branch(
                Predicate { _, value -> value.containsKey(JSON.recordTag) },
                Predicate { _, value -> value.containsKey(JSON.institutionTag) },
                Predicate { _, value -> value.containsKey(JSON.recordSetTag) },
                Predicate { _, _ -> true }
            )
79 80

        val recordStream = branchedStream[0]
81 82 83 84 85 86 87 88 89 90 91
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        documentSearchDocBuilder.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        DocumentsSearchDoc.DEFAULT,
                        Report(readOnlyKey, ReportStatus.warning, ex.localizedMessage, Service.name)
                    )
92
                }
93
            }
94 95 96 97

        outputStreams(recordStream)

        val institutionStream = branchedStream[1]
98 99 100 101 102 103 104 105 106 107 108 109 110
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        institutionSearchDoc.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        InstitutionSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
                            Service.name
111
                        )
112
                    )
Jonas Waeber's avatar
Jonas Waeber committed
113
                }
114
            }
115
        outputStreams(institutionStream)
116

117
        val recordSetStream = branchedStream[2]
118 119 120 121 122 123 124 125 126 127 128 129 130
            .mapValues { readOnlyKey, value ->
                try {
                    Pair(
                        recordSetSearchDocBuilder.transform(readOnlyKey, value),
                        Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
                    )
                } catch (ex: InvalidInputException) {
                    Pair(
                        RecordSetSearchDoc.DEFAULT, Report(
                            readOnlyKey,
                            ReportStatus.warning,
                            ex.localizedMessage,
                            Service.name
131
                        )
132
                    )
133
                }
134
            }
135 136
        outputStreams(recordSetStream)

137
        branchedStream[3]
138 139 140 141 142 143 144 145 146
            .mapValues { readOnlyKey, value ->
                Report(
                    readOnlyKey,
                    ReportStatus.fatal,
                    "No record, memobase institution or record set present in input data: $value.",
                    Service.name
                )
            }
            .to(reportTopic)
147 148 149 150 151
        return builder.build()
    }

    private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
        stream
152 153
            .mapValues { value -> value.second.toJson() }
            .to(reportTopic)
Jonas Waeber's avatar
Jonas Waeber committed
154

155
        val schema = stream
156 157
            .filterNot { _, value -> value.second.status == ReportStatus.fatal }
            .mapValues { value -> value.first }
158 159

        schema
160 161 162 163 164 165
            .mapValues { value ->
                val out = StringWriter()
                jsonWriter.writeValue(out, value)
                out.toString()
            }
            .to(settings.outputTopic)
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

        // generate update messages for institution & record set names!
        schema
            .map { key, value ->
                when (value) {
                    is InstitutionSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateInstitutionName(value.id, value.name)
                    )
                    is RecordSetSearchDoc -> KeyValue(
                        "$key#update",
                        updateQueryBuilder.updateRecordSetName(value.id, value.name)
                    )
                    else -> KeyValue(key, null)
                }
            }
            .filter { _, value -> value != null }
            .mapValues { value -> JSON.queryToJson(value!!) }
            .to(updateTopic)
185 186 187 188 189 190 191 192 193 194 195 196 197

        schema.map { key, value ->
            when (value) {
                is RecordSetSearchDoc -> KeyValue(
                    "$key#update",
                    updateQueryBuilder.updateAccessInstitutionContainer(value.id, value.accessInstitution)
                )
                else -> KeyValue(key, null)
            }
        }
            .filter { _, value -> value != null }
            .mapValues { value -> JSON.queryToJson(value!!) }
            .to(updateTopic)
Jonas Waeber's avatar
Jonas Waeber committed
198 199
    }
}