KafkaTopology.kt 6.49 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
23
24
25
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.mapping.MappingConfigurationParser
Jonas Waeber's avatar
Jonas Waeber committed
26
import com.beust.klaxon.Klaxon
27
import com.beust.klaxon.KlaxonException
Jonas Waeber's avatar
Jonas Waeber committed
28
29
import mapping.MapperConfiguration
import org.apache.kafka.common.serialization.Serdes
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.apache.kafka.streams.StreamsBuilder
32
33
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.apache.logging.log4j.LogManager
35
import org.memobase.helpers.ReportStatus
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
37
38
39
import settings.HeaderExtractionTransformSupplier
import settings.HeaderMetadata
import java.io.ByteArrayInputStream
Jonas Waeber's avatar
Jonas Waeber committed
40

41
42
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
43
44
) {
    private val log = LogManager.getLogger("KafkaTopology")
Jonas Waeber's avatar
Jonas Waeber committed
45
46
47
48
49
50
51
52
53
54
55
    private val reportTopic = settings.processReportTopic
    private val klaxon = Klaxon()

    private val configJoiner = ConfigJoiner<String, MapperConfiguration>(
        ImportService.Mapping,
        Serdes.String(),
        Serdes.serdeFrom(
            { _, data -> klaxon.toJsonString(data).toByteArray() },
            { _, data -> klaxon.parse<MapperConfiguration>(ByteArrayInputStream(data)) }),
        this::parseConfig
    )
Jonas Waeber's avatar
Jonas Waeber committed
56

Jonas Waeber's avatar
Jonas Waeber committed
57
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
58
59
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
60
61
        val configStream = builder.stream<String, String>("import-process-config")
            .map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
Jonas Waeber's avatar
Jonas Waeber committed
62

Jonas Waeber's avatar
Jonas Waeber committed
63
        val stream = builder.stream<String, String>(settings.inputTopic)
64

Jonas Waeber's avatar
Jonas Waeber committed
65
66
        val joinedStream =
            configJoiner.join(stream, configStream)
67

Jonas Waeber's avatar
Jonas Waeber committed
68
69
        val jsonStream = joinedStream
            .flatMapValues { value -> parseJsonFile(value) }
70

Jonas Waeber's avatar
Jonas Waeber committed
71
72
        val extractedRecordIdStream = jsonStream
            .transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
73
74
75
76
77
78
79
80
81
82
83
84
85
86
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }


        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

Jonas Waeber's avatar
Jonas Waeber committed
87
88
        val recordStream = completedMappingStream
            .map { _, value -> writeRecord(value) }
Jonas Waeber's avatar
Jonas Waeber committed
89
        objectOutput(recordStream)
Jonas Waeber's avatar
Jonas Waeber committed
90
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
91
    }
Jonas Waeber's avatar
Jonas Waeber committed
92

Jonas Waeber's avatar
Jonas Waeber committed
93
    private fun objectOutput(stream: KStream<String, Pair<String, Report>>) {
94
        stream
Jonas Waeber's avatar
Jonas Waeber committed
95
            .mapValues { _, value -> value.first }
96
97
98
            .to(settings.outputTopic)

        stream
Jonas Waeber's avatar
Jonas Waeber committed
99
            .mapValues { _, value -> value.second.toJson() }
100
101
102
            .to(reportTopic)
    }

Jonas Waeber's avatar
Jonas Waeber committed
103
    private fun buildResources(value: Pair<Pair<Map<String, Any>, MapperConfiguration>, HeaderMetadata>): ResourceBuilder {
104
        return ResourceBuilder(
Jonas Waeber's avatar
Jonas Waeber committed
105
106
107
108
109
            value.first.first,
            value.first.second,
            value.second.recordSetId,
            value.second.institutionId,
            value.second.isPublished
110
111
112
113
114
115
116
117
118
119
120
121
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
Jonas Waeber's avatar
Jonas Waeber committed
122
            .mapValues { key, _ -> Report(key, ReportStatus.failure, "No record id found for record $key.").toJson() }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
Jonas Waeber's avatar
Jonas Waeber committed
139
            .mapValues { key, _ ->
Jonas Waeber's avatar
Jonas Waeber committed
140
141
142
143
                Report(
                    key,
                    ReportStatus.failure,
                    "No correct record type found for record $key."
Jonas Waeber's avatar
Jonas Waeber committed
144
                ).toJson()
Jonas Waeber's avatar
Jonas Waeber committed
145
            }
146
147
148
149
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
150

Jonas Waeber's avatar
Jonas Waeber committed
151
152
153
154
155
156
157
158
159
    private fun parseJsonFile(input: JoinedValues<String, MapperConfiguration>):
            List<Pair<Map<String, String>, MapperConfiguration>> {
        return try {
            Klaxon().parse<Map<String, String>>(input.left).let {
                if (it != null) {
                    listOf(Pair(it, input.right))
                } else {
                    // TODO: REPORT EMPTY JSON
                    emptyList()
160
                }
Jonas Waeber's avatar
Jonas Waeber committed
161
            }
Jonas Waeber's avatar
Jonas Waeber committed
162
163
164
        } catch (ex: KlaxonException) {
            // TODO: REPORT JSON PARSE EXCEPTIONS
            emptyList()
Jonas Waeber's avatar
Jonas Waeber committed
165
166
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

    private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
        val result = builder.writeRecord()
        return KeyValue(
            result.first,
            Pair(
                result.second,
                Report(result.first, ReportStatus.success, "Successfully mapped record with id ${result.first}.")
            )
        )
    }

    private fun parseConfig(data: ByteArray): MapperConfiguration {
        return MappingConfigurationParser(data).get()
    }
Jonas Waeber's avatar
Jonas Waeber committed
182
}