KafkaTopology.kt 9.83 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
import ch.memobase.builder.ResourceBuilder
22
import ch.memobase.exceptions.InvalidMappingException
Jonas Waeber's avatar
Jonas Waeber committed
23
24
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
Jonas Waeber's avatar
Jonas Waeber committed
25
import ch.memobase.kafka.utils.models.JoinedValues
26
import ch.memobase.kafka.utils.models.ValueWithException
Jonas Waeber's avatar
Jonas Waeber committed
27
import ch.memobase.mapping.MapperConfiguration
Jonas Waeber's avatar
Jonas Waeber committed
28
import ch.memobase.mapping.MappingConfigurationParser
Jonas Waeber's avatar
Jonas Waeber committed
29
30
31
32
33
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
34
import com.beust.klaxon.Klaxon
35
import com.beust.klaxon.KlaxonException
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.apache.jena.riot.RDFFormat
Jonas Waeber's avatar
Jonas Waeber committed
37
import org.apache.kafka.common.serialization.Serdes
Jonas Waeber's avatar
Jonas Waeber committed
38
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
39
import org.apache.kafka.streams.StreamsBuilder
40
41
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
42
43
import org.apache.logging.log4j.LogManager

44
45
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
46
) {
Jonas Waeber's avatar
Jonas Waeber committed
47

Jonas Waeber's avatar
Jonas Waeber committed
48
    private val log = LogManager.getLogger("MappingProcessor")
Jonas Waeber's avatar
Jonas Waeber committed
49
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
50
    private val klaxon = Klaxon()
Jonas Waeber's avatar
Jonas Waeber committed
51

52
    private val configJoiner = ConfigJoiner<String, ByteArray>(
Jonas Waeber's avatar
Jonas Waeber committed
53
54
55
        ImportService.Mapping,
        Serdes.String(),
        Serdes.serdeFrom(
56
57
58
59
60
61
62
            { _, data ->
                data
            },
            { _, data ->
                data
            }
        ),
Jonas Waeber's avatar
Jonas Waeber committed
63
64
        this::parseConfig
    )
Jonas Waeber's avatar
Jonas Waeber committed
65

Jonas Waeber's avatar
Jonas Waeber committed
66
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
67
68
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
69
70
        val configStream = builder.stream<String, String>("import-process-config")
            .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
Jonas Waeber's avatar
Jonas Waeber committed
71

Jonas Waeber's avatar
Jonas Waeber committed
72
        val stream = builder.stream<String, String>(settings.inputTopic)
73

Jonas Waeber's avatar
Jonas Waeber committed
74
75
        val joinedStream =
            configJoiner.join(stream, configStream)
76

77
        val handledStream = joinedStream
Jonas Waeber's avatar
Jonas Waeber committed
78
            .mapValues { value -> handleExceptions(value) }
79
80
81
82
83
84
85
            .branch(
                Predicate { _, value -> value.third != "" },
                Predicate { _, _ -> true }
            )

        handledStream[0]
            .mapValues { readOnlyKey, value ->
Jonas Waeber's avatar
Jonas Waeber committed
86
                Report(readOnlyKey, ReportStatus.fatal, value.third, Service.step).toJson()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
87
            }
88
89
90
91
            .to(reportTopic)

        val parsedStream = handledStream[1]
            .mapValues { value -> Pair(value.first, value.second) }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
92
93
94
            .mapValues { readOnlyKey, value ->
                parse(readOnlyKey, value)
            }
Jonas Waeber's avatar
Jonas Waeber committed
95
96
97
98
99
100
101
102
103
            .branch(
                Predicate { _, value -> value.third != null },
                Predicate { _, _ -> true }
            )

        parsedStream[0]
            .mapValues { key, value ->
                value.third?.toJson() ?: Report(
                    key,
Jonas Waeber's avatar
Jonas Waeber committed
104
                    ReportStatus.fatal,
105
                    "Caught an error, but no report was created.",
Jonas Waeber's avatar
Jonas Waeber committed
106
                    Service.step
Jonas Waeber's avatar
Jonas Waeber committed
107
108
109
                )
            }
            .to(reportTopic)
110

Jonas Waeber's avatar
Jonas Waeber committed
111
112
        val extractedRecordIdStream = parsedStream[1]
            .mapValues { value -> Pair(value.first!!, value.second!!) }
Jonas Waeber's avatar
Jonas Waeber committed
113
            .transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
114
115
116
117
118
119
120
121
122
123
124
125
126
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }

        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

Jonas Waeber's avatar
Jonas Waeber committed
127
128
        val recordStream = completedMappingStream
            .map { _, value -> writeRecord(value) }
129

Jonas Waeber's avatar
Jonas Waeber committed
130
        objectOutput(recordStream)
131

Jonas Waeber's avatar
Jonas Waeber committed
132
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
133
    }
Jonas Waeber's avatar
Jonas Waeber committed
134

135
136
137
    private fun handleExceptions(value: ValueWithException<JoinedValues<String, ByteArray>>): Triple<String, ByteArray, String> {
        return when {
            value.hasException() -> {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
138
                Triple("", ByteArray(0), value.exception.localizedMessage)
139
140
141
142
143
144
145
146
147
148
            }
            value.hasValue() -> {
                Triple(value.value.left, value.value.right, "")
            }
            else -> {
                Triple("", ByteArray(0), "Could not handle error in kafka utils library.")
            }
        }
    }

Jonas Waeber's avatar
Jonas Waeber committed
149
    private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
150
        stream
151
            .filter { _, value -> value.second.status != ReportStatus.fatal }
Jonas Waeber's avatar
Jonas Waeber committed
152
            .mapValues { _, value -> value.first }
153
154
155
            .to(settings.outputTopic)

        stream
Jonas Waeber's avatar
Jonas Waeber committed
156
            .mapValues { _, value -> value.second.toJson() }
157
158
159
            .to(reportTopic)
    }

Jonas Waeber's avatar
Jonas Waeber committed
160
    private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
161
        return ResourceBuilder(
Jonas Waeber's avatar
Jonas Waeber committed
162
163
164
            value.first.first,
            value.first.second,
            value.second.institutionId,
165
            value.second.recordSetId,
Jonas Waeber's avatar
Jonas Waeber committed
166
            value.second.isPublished
167
168
169
170
171
172
173
174
175
176
177
178
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
179
180
181
182
            .mapValues { key, value ->
                Report(
                    key,
                    ReportStatus.fatal,
Jonas Waeber's avatar
Jonas Waeber committed
183
                    "No Record Id Found: " + value.errorMessages.joinToString("\n"),
184
185
186
                    Service.step
                ).toJson()
            }
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
Jonas Waeber's avatar
Jonas Waeber committed
203
            .mapValues { key, value ->
Jonas Waeber's avatar
Jonas Waeber committed
204
205
                Report(
                    key,
Jonas Waeber's avatar
Jonas Waeber committed
206
                    ReportStatus.fatal,
Jonas Waeber's avatar
Jonas Waeber committed
207
                    "No Record Type Value: " + value.errorMessages.joinToString("\n"),
Jonas Waeber's avatar
Jonas Waeber committed
208
                    Service.step
Jonas Waeber's avatar
Jonas Waeber committed
209
                ).toJson()
Jonas Waeber's avatar
Jonas Waeber committed
210
            }
211
212
213
214
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
215

Jonas Waeber's avatar
Jonas Waeber committed
216
    private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
Jonas Waeber's avatar
Jonas Waeber committed
217
        val result = builder.writeRecord(RDFFormat.NTRIPLES_UTF8)
218
219
220
221
222
223
224
225

        if (result.third.isEmpty()) {
            return KeyValue(
                result.first,
                Pair(
                    result.second,
                    Report(result.first, ReportStatus.success, "", Service.step)
                )
Jonas Waeber's avatar
Jonas Waeber committed
226
            )
227
228
229
230
231
        } else {
            return KeyValue(
                result.first,
                Pair(
                    result.second,
232
233
234
                    Report(
                        result.first,
                        if (builder.isFatal) ReportStatus.fatal else ReportStatus.warning,
Jonas Waeber's avatar
Jonas Waeber committed
235
                        "Write Record Error: " + result.third.joinToString("\n"),
236
237
                        Service.step
                    )
238
239
240
                )
            )
        }
Jonas Waeber's avatar
Jonas Waeber committed
241
242
    }

243
244
    private fun parseConfig(data: ByteArray): ByteArray {
        return data
Jonas Waeber's avatar
Jonas Waeber committed
245
    }
Jonas Waeber's avatar
Jonas Waeber committed
246
247
248

    private fun parse(
        key: String,
249
        value: Pair<String, ByteArray>
Jonas Waeber's avatar
Jonas Waeber committed
250
251
    ): Triple<Map<String, Any>?, MapperConfiguration?, Report?> {
        return try {
252
253
            val mapperConfiguration = MappingConfigurationParser(value.second)
            val parsedSource = klaxon.parse<Map<String, Any>>(value.first)
Jonas Waeber's avatar
Jonas Waeber committed
254
255
256
257
            if (parsedSource != null) {
                log.info("Successfully parsed source & mapping configuration.")
                Triple(parsedSource, mapperConfiguration.get(), null)
            } else {
258
                log.error("Parsed source is empty: ${value.first}.")
Jonas Waeber's avatar
Jonas Waeber committed
259
                Triple(null, null, Report(key, ReportStatus.fatal, "Could not parse source document.", Service.step))
Jonas Waeber's avatar
Jonas Waeber committed
260
261
262
            }
        } catch (ex: InvalidMappingException) {
            log.error(ex.localizedMessage)
Jonas Waeber's avatar
Jonas Waeber committed
263
            Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
Jonas Waeber's avatar
Jonas Waeber committed
264
265
        } catch (ex: KlaxonException) {
            log.error(ex.localizedMessage)
Jonas Waeber's avatar
Jonas Waeber committed
266
            Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
267
268
269
270
271
        } catch (ex: NullPointerException) {
            log.error(ex.localizedMessage)
            Triple(
                null,
                null,
Jonas Waeber's avatar
Jonas Waeber committed
272
                Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", Service.step)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
273
            )
Jonas Waeber's avatar
Jonas Waeber committed
274
275
        }
    }
276
}