KafkaTopology.kt 6.39 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
23
24
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.mapping.MappingConfigurationParser
Jonas Waeber's avatar
Jonas Waeber committed
25
import com.beust.klaxon.Klaxon
26
import com.beust.klaxon.KlaxonException
Jonas Waeber's avatar
Jonas Waeber committed
27
28
import mapping.MapperConfiguration
import org.apache.kafka.common.serialization.Serdes
Jonas Waeber's avatar
Jonas Waeber committed
29
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.apache.kafka.streams.StreamsBuilder
31
32
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.apache.logging.log4j.LogManager
34
import org.memobase.helpers.ReportStatus
Jonas Waeber's avatar
Jonas Waeber committed
35
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
36
37
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
Jonas Waeber's avatar
Jonas Waeber committed
38

39
40
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
41
42
) {
    private val log = LogManager.getLogger("KafkaTopology")
Jonas Waeber's avatar
Jonas Waeber committed
43
44
    private val reportTopic = settings.processReportTopic

45
    private val configJoiner = ConfigJoiner<String, ByteArray>(
Jonas Waeber's avatar
Jonas Waeber committed
46
47
48
        ImportService.Mapping,
        Serdes.String(),
        Serdes.serdeFrom(
49
50
51
52
53
54
55
            { _, data ->
                data
            },
            { _, data ->
                data
            }
        ),
Jonas Waeber's avatar
Jonas Waeber committed
56
57
        this::parseConfig
    )
Jonas Waeber's avatar
Jonas Waeber committed
58

Jonas Waeber's avatar
Jonas Waeber committed
59
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
60
61
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
62
63
        val configStream = builder.stream<String, String>("import-process-config")
            .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
Jonas Waeber's avatar
Jonas Waeber committed
64

Jonas Waeber's avatar
Jonas Waeber committed
65
        val stream = builder.stream<String, String>(settings.inputTopic)
66

Jonas Waeber's avatar
Jonas Waeber committed
67
68
        val joinedStream =
            configJoiner.join(stream, configStream)
69

Jonas Waeber's avatar
Jonas Waeber committed
70
        val jsonStream = joinedStream
71
            .mapValues { value -> Pair(value.left, MappingConfigurationParser(value.right).get()) }
Jonas Waeber's avatar
Jonas Waeber committed
72
            .flatMapValues { value -> parseJsonFile(value) }
73

Jonas Waeber's avatar
Jonas Waeber committed
74
75
        val extractedRecordIdStream = jsonStream
            .transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
76
77
78
79
80
81
82
83
84
85
86
87
88
89
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }


        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

Jonas Waeber's avatar
Jonas Waeber committed
90
91
        val recordStream = completedMappingStream
            .map { _, value -> writeRecord(value) }
Jonas Waeber's avatar
Jonas Waeber committed
92
        objectOutput(recordStream)
Jonas Waeber's avatar
Jonas Waeber committed
93
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
94
    }
Jonas Waeber's avatar
Jonas Waeber committed
95

Jonas Waeber's avatar
Jonas Waeber committed
96
    private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
97
        stream
Jonas Waeber's avatar
Jonas Waeber committed
98
            .mapValues { _, value -> value.first }
99
100
101
            .to(settings.outputTopic)

        stream
Jonas Waeber's avatar
Jonas Waeber committed
102
            .mapValues { _, value -> value.second.toJson() }
103
104
105
            .to(reportTopic)
    }

Jonas Waeber's avatar
Jonas Waeber committed
106
    private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
107
        return ResourceBuilder(
Jonas Waeber's avatar
Jonas Waeber committed
108
109
110
111
112
            value.first.first,
            value.first.second,
            value.second.recordSetId,
            value.second.institutionId,
            value.second.isPublished
113
114
115
116
117
118
119
120
121
122
123
124
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
Jonas Waeber's avatar
Jonas Waeber committed
125
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.").toJson() }
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
Jonas Waeber's avatar
Jonas Waeber committed
142
            .mapValues { key, _ ->
Jonas Waeber's avatar
Jonas Waeber committed
143
144
145
146
                Report(
                    key,
                    ReportStatus.failure,
                    "No correct record type found for record $key."
Jonas Waeber's avatar
Jonas Waeber committed
147
                ).toJson()
Jonas Waeber's avatar
Jonas Waeber committed
148
            }
149
150
151
152
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
153

154
    private fun parseJsonFile(input: Pair<String, MapperConfiguration>):
Jonas Waeber's avatar
Jonas Waeber committed
155
156
            List<Pair<Map<String, String>, MapperConfiguration>> {
        return try {
157
            Klaxon().parse<Map<String, String>>(input.first).let {
Jonas Waeber's avatar
Jonas Waeber committed
158
                if (it != null) {
159
                    listOf(Pair(it, input.second))
Jonas Waeber's avatar
Jonas Waeber committed
160
161
162
                } else {
                    // TODO: REPORT EMPTY JSON
                    emptyList()
163
                }
Jonas Waeber's avatar
Jonas Waeber committed
164
            }
Jonas Waeber's avatar
Jonas Waeber committed
165
166
167
        } catch (ex: KlaxonException) {
            // TODO: REPORT JSON PARSE EXCEPTIONS
            emptyList()
Jonas Waeber's avatar
Jonas Waeber committed
168
169
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
170
171
172
173
174
175
176
177
178
179
180
181

    private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
        val result = builder.writeRecord()
        return KeyValue(
            result.first,
            Pair(
                result.second,
                Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.")
            )
        )
    }

182
183
    private fun parseConfig(data: ByteArray): ByteArray {
        return data
Jonas Waeber's avatar
Jonas Waeber committed
184
    }
Jonas Waeber's avatar
Jonas Waeber committed
185
}