KafkaTopology.kt 9.05 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
23
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
Jonas Waeber's avatar
Jonas Waeber committed
24
import ch.memobase.kafka.utils.models.JoinedValues
25
import ch.memobase.kafka.utils.models.ValueWithException
Jonas Waeber's avatar
Jonas Waeber committed
26
import ch.memobase.mapping.MapperConfiguration
Jonas Waeber's avatar
Jonas Waeber committed
27
import ch.memobase.mapping.MappingConfigurationParser
28
import ch.memobase.exceptions.InvalidMappingException
Jonas Waeber's avatar
Jonas Waeber committed
29
30
31
32
33
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
34
import com.beust.klaxon.Klaxon
35
import com.beust.klaxon.KlaxonException
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.apache.kafka.common.serialization.Serdes
Jonas Waeber's avatar
Jonas Waeber committed
37
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
38
import org.apache.kafka.streams.StreamsBuilder
39
40
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
41
42
import org.apache.logging.log4j.LogManager

43
44
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
45
) {
Jonas Waeber's avatar
Jonas Waeber committed
46

Jonas Waeber's avatar
Jonas Waeber committed
47
    private val log = LogManager.getLogger("KafkaTopology")
Jonas Waeber's avatar
Jonas Waeber committed
48
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
49
    private val klaxon = Klaxon()
Jonas Waeber's avatar
Jonas Waeber committed
50

51
    private val configJoiner = ConfigJoiner<String, ByteArray>(
Jonas Waeber's avatar
Jonas Waeber committed
52
53
54
        ImportService.Mapping,
        Serdes.String(),
        Serdes.serdeFrom(
55
56
57
58
59
60
61
            { _, data ->
                data
            },
            { _, data ->
                data
            }
        ),
Jonas Waeber's avatar
Jonas Waeber committed
62
63
        this::parseConfig
    )
Jonas Waeber's avatar
Jonas Waeber committed
64

Jonas Waeber's avatar
Jonas Waeber committed
65
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
66
67
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
68
69
        val configStream = builder.stream<String, String>("import-process-config")
            .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
Jonas Waeber's avatar
Jonas Waeber committed
70

Jonas Waeber's avatar
Jonas Waeber committed
71
        val stream = builder.stream<String, String>(settings.inputTopic)
72

Jonas Waeber's avatar
Jonas Waeber committed
73
74
        val joinedStream =
            configJoiner.join(stream, configStream)
75

76
77
78
79
80
81
82
83
84
        val handledStream = joinedStream
            .mapValues { readOnlyKey, value -> handleExceptions(value) }
            .branch(
                Predicate { _, value -> value.third != "" },
                Predicate { _, _ -> true }
            )

        handledStream[0]
            .mapValues { readOnlyKey, value ->
Jonas Waeber's avatar
Jonas Waeber committed
85
                Report(readOnlyKey, ReportStatus.fatal, value.third, Service.step).toJson()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
86
            }
87
88
89
90
            .to(reportTopic)

        val parsedStream = handledStream[1]
            .mapValues { value -> Pair(value.first, value.second) }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
91
92
93
            .mapValues { readOnlyKey, value ->
                parse(readOnlyKey, value)
            }
Jonas Waeber's avatar
Jonas Waeber committed
94
95
96
97
98
99
100
101
102
            .branch(
                Predicate { _, value -> value.third != null },
                Predicate { _, _ -> true }
            )

        parsedStream[0]
            .mapValues { key, value ->
                value.third?.toJson() ?: Report(
                    key,
Jonas Waeber's avatar
Jonas Waeber committed
103
                    ReportStatus.fatal,
104
                    "Caught an error, but no report was created.",
Jonas Waeber's avatar
Jonas Waeber committed
105
                    Service.step
Jonas Waeber's avatar
Jonas Waeber committed
106
107
108
                )
            }
            .to(reportTopic)
109

Jonas Waeber's avatar
Jonas Waeber committed
110
111
        val extractedRecordIdStream = parsedStream[1]
            .mapValues { value -> Pair(value.first!!, value.second!!) }
Jonas Waeber's avatar
Jonas Waeber committed
112
            .transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
113
114
115
116
117
118
119
120
121
122
123
124
125
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }

        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

Jonas Waeber's avatar
Jonas Waeber committed
126
127
        val recordStream = completedMappingStream
            .map { _, value -> writeRecord(value) }
Jonas Waeber's avatar
Jonas Waeber committed
128
        objectOutput(recordStream)
Jonas Waeber's avatar
Jonas Waeber committed
129
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
130
    }
Jonas Waeber's avatar
Jonas Waeber committed
131

132
133
134
    private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> {
        return when {
            value.hasException() -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
135
                Triple("", ByteArray(0), value.exception.localizedMessage)
136
137
138
139
140
141
142
143
144
145
            }
            value.hasValue() -> {
                Triple(value.value.left, value.value.right, "")
            }
            else -> {
                Triple("", ByteArray(0), "Could not handle error in kafka utils library.")
            }
        }
    }

Jonas Waeber's avatar
Jonas Waeber committed
146
    private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
147
        stream
Jonas Waeber's avatar
Jonas Waeber committed
148
            .mapValues { _, value -> value.first }
149
150
151
            .to(settings.outputTopic)

        stream
Jonas Waeber's avatar
Jonas Waeber committed
152
            .mapValues { _, value -> value.second.toJson() }
153
154
155
            .to(reportTopic)
    }

Jonas Waeber's avatar
Jonas Waeber committed
156
    private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
157
        return ResourceBuilder(
Jonas Waeber's avatar
Jonas Waeber committed
158
159
160
            value.first.first,
            value.first.second,
            value.second.institutionId,
161
            value.second.recordSetId,
Jonas Waeber's avatar
Jonas Waeber committed
162
            value.second.isPublished
163
164
165
166
167
168
169
170
171
172
173
174
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
Jonas Waeber's avatar
Jonas Waeber committed
175
            .mapValues { key, _ -> Report(key, ReportStatus.fatal, "No record id found for record $key.", Service.step).toJson() }
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
Jonas Waeber's avatar
Jonas Waeber committed
192
            .mapValues { key, _ ->
Jonas Waeber's avatar
Jonas Waeber committed
193
194
                Report(
                    key,
Jonas Waeber's avatar
Jonas Waeber committed
195
196
197
                    ReportStatus.fatal,
                    "No correct record type found for record $key.",
                    Service.step
Jonas Waeber's avatar
Jonas Waeber committed
198
                ).toJson()
Jonas Waeber's avatar
Jonas Waeber committed
199
            }
200
201
202
203
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
204

Jonas Waeber's avatar
Jonas Waeber committed
205
206
207
208
209
210
    private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
        val result = builder.writeRecord()
        return KeyValue(
            result.first,
            Pair(
                result.second,
Jonas Waeber's avatar
Jonas Waeber committed
211
                Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.", Service.step)
Jonas Waeber's avatar
Jonas Waeber committed
212
213
214
215
            )
        )
    }

216
217
    private fun parseConfig(data: ByteArray): ByteArray {
        return data
Jonas Waeber's avatar
Jonas Waeber committed
218
    }
Jonas Waeber's avatar
Jonas Waeber committed
219
220
221

    private fun parse(
        key: String,
222
        value: Pair<String, ByteArray>
Jonas Waeber's avatar
Jonas Waeber committed
223
224
    ): Triple<Map<String, Any>?, MapperConfiguration?, Report?> {
        return try {
225
226
            val mapperConfiguration = MappingConfigurationParser(value.second)
            val parsedSource = klaxon.parse<Map<String, Any>>(value.first)
Jonas Waeber's avatar
Jonas Waeber committed
227
228
229
230
            if (parsedSource != null) {
                log.info("Successfully parsed source & mapping configuration.")
                Triple(parsedSource, mapperConfiguration.get(), null)
            } else {
231
                log.error("Parsed source is empty: ${value.first}.")
Jonas Waeber's avatar
Jonas Waeber committed
232
                Triple(null, null, Report(key, ReportStatus.fatal, "Found empty source document.", Service.step))
Jonas Waeber's avatar
Jonas Waeber committed
233
234
235
            }
        } catch (ex: InvalidMappingException) {
            log.error(ex.localizedMessage)
Jonas Waeber's avatar
Jonas Waeber committed
236
            Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
Jonas Waeber's avatar
Jonas Waeber committed
237
238
        } catch (ex: KlaxonException) {
            log.error(ex.localizedMessage)
Jonas Waeber's avatar
Jonas Waeber committed
239
            Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
240
241
242
243
244
        } catch (ex: NullPointerException) {
            log.error(ex.localizedMessage)
            Triple(
                null,
                null,
Jonas Waeber's avatar
Jonas Waeber committed
245
                Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", Service.step)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
246
            )
Jonas Waeber's avatar
Jonas Waeber committed
247
248
        }
    }
249
}