KafkaTopology.kt 7.46 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
23
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
Jonas Waeber's avatar
Jonas Waeber committed
24
import ch.memobase.kafka.utils.models.JoinedValues
Jonas Waeber's avatar
Jonas Waeber committed
25
import ch.memobase.mapping.MappingConfigurationParser
Jonas Waeber's avatar
Jonas Waeber committed
26
import ch.memobase.mapping.exceptions.InvalidMappingException
Jonas Waeber's avatar
Jonas Waeber committed
27
import com.beust.klaxon.Klaxon
28
import com.beust.klaxon.KlaxonException
Jonas Waeber's avatar
Jonas Waeber committed
29
30
import mapping.MapperConfiguration
import org.apache.kafka.common.serialization.Serdes
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.kafka.streams.StreamsBuilder
33
34
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
35
import org.apache.logging.log4j.LogManager
36
import org.memobase.helpers.ReportStatus
Jonas Waeber's avatar
Jonas Waeber committed
37
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
38
39
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
40

41
42
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
43
44
) {
    private val log = LogManager.getLogger("KafkaTopology")
Jonas Waeber's avatar
Jonas Waeber committed
45
    private val reportTopic = settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
46
    private val klaxon = Klaxon()
Jonas Waeber's avatar
Jonas Waeber committed
47

48
    private val configJoiner = ConfigJoiner<String, ByteArray>(
Jonas Waeber's avatar
Jonas Waeber committed
49
50
51
        ImportService.Mapping,
        Serdes.String(),
        Serdes.serdeFrom(
52
53
54
55
56
57
58
            { _, data ->
                data
            },
            { _, data ->
                data
            }
        ),
Jonas Waeber's avatar
Jonas Waeber committed
59
60
        this::parseConfig
    )
Jonas Waeber's avatar
Jonas Waeber committed
61

Jonas Waeber's avatar
Jonas Waeber committed
62
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
63
64
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
65
66
        val configStream = builder.stream<String, String>("import-process-config")
            .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
Jonas Waeber's avatar
Jonas Waeber committed
67

Jonas Waeber's avatar
Jonas Waeber committed
68
        val stream = builder.stream<String, String>(settings.inputTopic)
69

Jonas Waeber's avatar
Jonas Waeber committed
70
71
        val joinedStream =
            configJoiner.join(stream, configStream)
72

Jonas Waeber's avatar
Jonas Waeber committed
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
        val parsedStream = joinedStream
            .mapValues { readOnlyKey, value -> parse(readOnlyKey, value) }
            .branch(
                Predicate { _, value -> value.third != null },
                Predicate { _, _ -> true }
            )

        parsedStream[0]
            .mapValues { key, value ->
                value.third?.toJson() ?: Report(
                    key,
                    ReportStatus.failure,
                    "Caught an error, but not report was created."
                )
            }
            .to(reportTopic)
89

Jonas Waeber's avatar
Jonas Waeber committed
90
91
        val extractedRecordIdStream = parsedStream[1]
            .mapValues { value -> Pair(value.first!!, value.second!!) }
Jonas Waeber's avatar
Jonas Waeber committed
92
            .transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
93
94
95
96
97
98
99
100
101
102
103
104
105
106
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }


        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

Jonas Waeber's avatar
Jonas Waeber committed
107
108
        val recordStream = completedMappingStream
            .map { _, value -> writeRecord(value) }
Jonas Waeber's avatar
Jonas Waeber committed
109
        objectOutput(recordStream)
Jonas Waeber's avatar
Jonas Waeber committed
110
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
111
    }
Jonas Waeber's avatar
Jonas Waeber committed
112

Jonas Waeber's avatar
Jonas Waeber committed
113
    private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
114
        stream
Jonas Waeber's avatar
Jonas Waeber committed
115
            .mapValues { _, value -> value.first }
116
117
118
            .to(settings.outputTopic)

        stream
Jonas Waeber's avatar
Jonas Waeber committed
119
            .mapValues { _, value -> value.second.toJson() }
120
121
122
            .to(reportTopic)
    }

Jonas Waeber's avatar
Jonas Waeber committed
123
    private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
124
        return ResourceBuilder(
Jonas Waeber's avatar
Jonas Waeber committed
125
126
127
            value.first.first,
            value.first.second,
            value.second.institutionId,
128
            value.second.recordSetId,
Jonas Waeber's avatar
Jonas Waeber committed
129
            value.second.isPublished
130
131
132
133
134
135
136
137
138
139
140
141
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
Jonas Waeber's avatar
Jonas Waeber committed
142
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.").toJson() }
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
Jonas Waeber's avatar
Jonas Waeber committed
159
            .mapValues { key, _ ->
Jonas Waeber's avatar
Jonas Waeber committed
160
161
162
163
                Report(
                    key,
                    ReportStatus.failure,
                    "No correct record type found for record $key."
Jonas Waeber's avatar
Jonas Waeber committed
164
                ).toJson()
Jonas Waeber's avatar
Jonas Waeber committed
165
            }
166
167
168
169
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
170

Jonas Waeber's avatar
Jonas Waeber committed
171
172
173
174
175
176
177
178
179
180
181
    private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
        val result = builder.writeRecord()
        return KeyValue(
            result.first,
            Pair(
                result.second,
                Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.")
            )
        )
    }

182
183
    private fun parseConfig(data: ByteArray): ByteArray {
        return data
Jonas Waeber's avatar
Jonas Waeber committed
184
    }
Jonas Waeber's avatar
Jonas Waeber committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

    private fun parse(
        key: String,
        value: JoinedValues<String, ByteArray>
    ): Triple<Map<String, Any>?, MapperConfiguration?, Report?> {
        return try {
            val mapperConfiguration = MappingConfigurationParser(value.right)
            val parsedSource = klaxon.parse<Map<String, Any>>(value.left)
            if (parsedSource != null) {
                log.info("Successfully parsed source & mapping configuration.")
                Triple(parsedSource, mapperConfiguration.get(), null)
            } else {
                log.error("Parsed source is empty: ${value.left}.")
                Triple(null, null, Report(key, ReportStatus.failure, "Found empty source document."))
            }
        } catch (ex: InvalidMappingException) {
            log.error(ex.localizedMessage)
            Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
        } catch (ex: KlaxonException) {
            log.error(ex.localizedMessage)
            Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
208
}