Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

KafkaTopology.kt 8.84 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
23
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
Jonas Waeber's avatar
Jonas Waeber committed
24
import ch.memobase.kafka.utils.models.JoinedValues
25
import ch.memobase.kafka.utils.models.ValueWithException
Jonas Waeber's avatar
Jonas Waeber committed
26
import ch.memobase.mapping.MappingConfigurationParser
Jonas Waeber's avatar
Jonas Waeber committed
27
import ch.memobase.mapping.exceptions.InvalidMappingException
Jonas Waeber's avatar
Jonas Waeber committed
28
import com.beust.klaxon.Klaxon
29
import com.beust.klaxon.KlaxonException
Jonas Waeber's avatar
Jonas Waeber committed
30
31
import mapping.MapperConfiguration
import org.apache.kafka.common.serialization.Serdes
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.apache.kafka.streams.StreamsBuilder
34
35
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.apache.logging.log4j.LogManager
37
import org.memobase.helpers.ReportStatus
Jonas Waeber's avatar
Jonas Waeber committed
38
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
39
40
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
41

42
43
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
44
45
) {
    private val log = LogManager.getLogger("KafkaTopology")
Jonas Waeber's avatar
Jonas Waeber committed
46
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
47
    private val klaxon = Klaxon()
Jonas Waeber's avatar
Jonas Waeber committed
48

49
    private val configJoiner = ConfigJoiner<String, ByteArray>(
Jonas Waeber's avatar
Jonas Waeber committed
50
51
52
        ImportService.Mapping,
        Serdes.String(),
        Serdes.serdeFrom(
53
54
55
56
57
58
59
            { _, data ->
                data
            },
            { _, data ->
                data
            }
        ),
Jonas Waeber's avatar
Jonas Waeber committed
60
61
        this::parseConfig
    )
Jonas Waeber's avatar
Jonas Waeber committed
62

Jonas Waeber's avatar
Jonas Waeber committed
63
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
64
65
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
66
67
        val configStream = builder.stream<String, String>("import-process-config")
            .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
Jonas Waeber's avatar
Jonas Waeber committed
68

Jonas Waeber's avatar
Jonas Waeber committed
69
        val stream = builder.stream<String, String>(settings.inputTopic)
70

Jonas Waeber's avatar
Jonas Waeber committed
71
72
        val joinedStream =
            configJoiner.join(stream, configStream)
73

74
75
76
77
78
79
80
81
82
        val handledStream = joinedStream
            .mapValues { readOnlyKey, value -> handleExceptions(value) }
            .branch(
                Predicate { _, value -> value.third != "" },
                Predicate { _, _ -> true }
            )

        handledStream[0]
            .mapValues { readOnlyKey, value ->
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
83
84
                Report(readOnlyKey, ReportStatus.failure, value.third).toJson()
            }
85
86
87
88
            .to(reportTopic)

        val parsedStream = handledStream[1]
            .mapValues { value -> Pair(value.first, value.second) }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
89
90
91
            .mapValues { readOnlyKey, value ->
                parse(readOnlyKey, value)
            }
Jonas Waeber's avatar
Jonas Waeber committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
            .branch(
                Predicate { _, value -> value.third != null },
                Predicate { _, _ -> true }
            )

        parsedStream[0]
            .mapValues { key, value ->
                value.third?.toJson() ?: Report(
                    key,
                    ReportStatus.failure,
                    "Caught an error, but not report was created."
                )
            }
            .to(reportTopic)
106

Jonas Waeber's avatar
Jonas Waeber committed
107
108
        val extractedRecordIdStream = parsedStream[1]
            .mapValues { value -> Pair(value.first!!, value.second!!) }
Jonas Waeber's avatar
Jonas Waeber committed
109
            .transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
110
111
112
113
114
115
116
117
118
119
120
121
122
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }

        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

Jonas Waeber's avatar
Jonas Waeber committed
123
124
        val recordStream = completedMappingStream
            .map { _, value -> writeRecord(value) }
Jonas Waeber's avatar
Jonas Waeber committed
125
        objectOutput(recordStream)
Jonas Waeber's avatar
Jonas Waeber committed
126
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
127
    }
Jonas Waeber's avatar
Jonas Waeber committed
128

129
130
131
    private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> {
        return when {
            value.hasException() -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
132
                Triple("", ByteArray(0), value.exception.localizedMessage)
133
134
135
136
137
138
139
140
141
142
            }
            value.hasValue() -> {
                Triple(value.value.left, value.value.right, "")
            }
            else -> {
                Triple("", ByteArray(0), "Could not handle error in kafka utils library.")
            }
        }
    }

Jonas Waeber's avatar
Jonas Waeber committed
143
    private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
144
        stream
Jonas Waeber's avatar
Jonas Waeber committed
145
            .mapValues { _, value -> value.first }
146
147
148
            .to(settings.outputTopic)

        stream
Jonas Waeber's avatar
Jonas Waeber committed
149
            .mapValues { _, value -> value.second.toJson() }
150
151
152
            .to(reportTopic)
    }

Jonas Waeber's avatar
Jonas Waeber committed
153
    private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
154
        return ResourceBuilder(
Jonas Waeber's avatar
Jonas Waeber committed
155
156
157
            value.first.first,
            value.first.second,
            value.second.institutionId,
158
            value.second.recordSetId,
Jonas Waeber's avatar
Jonas Waeber committed
159
            value.second.isPublished
160
161
162
163
164
165
166
167
168
169
170
171
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
Jonas Waeber's avatar
Jonas Waeber committed
172
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.").toJson() }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
Jonas Waeber's avatar
Jonas Waeber committed
189
            .mapValues { key, _ ->
Jonas Waeber's avatar
Jonas Waeber committed
190
191
192
193
                Report(
                    key,
                    ReportStatus.failure,
                    "No correct record type found for record $key."
Jonas Waeber's avatar
Jonas Waeber committed
194
                ).toJson()
Jonas Waeber's avatar
Jonas Waeber committed
195
            }
196
197
198
199
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
200

Jonas Waeber's avatar
Jonas Waeber committed
201
202
203
204
205
206
207
208
209
210
211
    private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
        val result = builder.writeRecord()
        return KeyValue(
            result.first,
            Pair(
                result.second,
                Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.")
            )
        )
    }

212
213
    private fun parseConfig(data: ByteArray): ByteArray {
        return data
Jonas Waeber's avatar
Jonas Waeber committed
214
    }
Jonas Waeber's avatar
Jonas Waeber committed
215
216
217

    private fun parse(
        key: String,
218
        value: Pair<String, ByteArray>
Jonas Waeber's avatar
Jonas Waeber committed
219
220
    ): Triple<Map<String, Any>?, MapperConfiguration?, Report?> {
        return try {
221
222
            val mapperConfiguration = MappingConfigurationParser(value.second)
            val parsedSource = klaxon.parse<Map<String, Any>>(value.first)
Jonas Waeber's avatar
Jonas Waeber committed
223
224
225
226
            if (parsedSource != null) {
                log.info("Successfully parsed source & mapping configuration.")
                Triple(parsedSource, mapperConfiguration.get(), null)
            } else {
227
                log.error("Parsed source is empty: ${value.first}.")
Jonas Waeber's avatar
Jonas Waeber committed
228
229
230
231
232
233
234
235
                Triple(null, null, Report(key, ReportStatus.failure, "Found empty source document."))
            }
        } catch (ex: InvalidMappingException) {
            log.error(ex.localizedMessage)
            Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
        } catch (ex: KlaxonException) {
            log.error(ex.localizedMessage)
            Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
236
237
238
239
240
241
242
        } catch (ex: NullPointerException) {
            log.error(ex.localizedMessage)
            Triple(
                null,
                null,
                Report(key, ReportStatus.failure, "There's no data to be processed: ${ex.localizedMessage}")
            )
Jonas Waeber's avatar
Jonas Waeber committed
243
244
        }
    }
245
}