KafkaTopology.kt 5.28 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * mapper-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
21
22
import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
Jonas Waeber's avatar
Jonas Waeber committed
23
24
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
25
26
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
Jonas Waeber's avatar
Jonas Waeber committed
27
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
28
29
import org.memobase.builder.ResourceBuilder
import org.memobase.mapping.MappingConfig
Jonas Waeber's avatar
Jonas Waeber committed
30
31
import org.memobase.settings.SettingsLoader

32
33
class KafkaTopology(
    private val settings: SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
34
35
) {
    private val log = LogManager.getLogger("KafkaTopology")
Jonas Waeber's avatar
Jonas Waeber committed
36
    private val config = MappingConfig(settings.appSettings.getProperty("configs"))
37
    private val reportTopic = settings.outputTopic + "-reporting"
Jonas Waeber's avatar
Jonas Waeber committed
38
39
40
41

    fun build(): Topology {
        val builder = StreamsBuilder()

Jonas Waeber's avatar
Jonas Waeber committed
42
43
        val stream = builder.stream<String, String>(settings.inputTopic)

44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
        val extractedRecordIdStream = stream.flatMapValues { value -> parseJsonFile(value) }
            .mapValues { value -> buildResources(value) }
            .mapValues { value -> value.extractRecordId() }


        val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)

        val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)

        val completedMappingStream = hasIdAndTypeStream
            .mapValues { value -> value.generateRecord() }
            .mapValues { value -> value.generatePhysicalObject() }
            .mapValues { value -> value.generateDigitalObject() }
            .mapValues { value -> value.addDerivedConnection() }

        val recordStream = completedMappingStream.mapValues { value -> value.writeRecord() }
        val physicalObjectStream = completedMappingStream
            .filter { _, value -> value.hasPhysicalObject() }
            .mapValues { value -> value.writePhysicalObject() }

        val digitalObjectStream = completedMappingStream
            .filter { _, value -> value.hasDigitalObject() }
            .mapValues { value -> value.writeDigitalObject() }

        objectOutput(recordStream)
        objectOutput(physicalObjectStream)
        objectOutput(digitalObjectStream)

Jonas Waeber's avatar
Jonas Waeber committed
72
73
        return builder.build()
    }
Jonas Waeber's avatar
Jonas Waeber committed
74

75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
    private fun objectOutput(stream: KStream<String, Pair<KeyValue<String, String>, Report>>) {
        stream
            .map { _, value -> value.first }
            .to(settings.outputTopic)

        stream
            .map { _, value -> KeyValue(value.first.key, value.second.toJson()) }
            .to(reportTopic)
    }

    private fun buildResources(value: Map<String, String>): ResourceBuilder {
        return ResourceBuilder(
            config.uriField,
            config.recordType,
            config.recordFieldMappers,
            config.physicalObjectFieldMappers,
            config.digitalObjectFieldMappers,
            value,
            settings.appSettings.getProperty("institutionId"),
94
95
            settings.appSettings.getProperty("recordSetId"),
            config.sponsoredByMemoriav
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
        )
    }

    private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordId = stream.branch(
            Predicate { _, value -> value.hasRecordId() },
            Predicate { _, _ -> true }
        )
        // early termination if there is no record id!
        val noRecordId = hasRecordId[1]

        noRecordId
            .mapValues { _ -> "ERROR: No record id specified!" }
            .to(settings.outputTopic)

        noRecordId
            .mapValues { key, value -> value.failureReport(key).toJson() }
            .to(reportTopic)

        return hasRecordId[0]
            .mapValues { value -> value.extractRecordTypeValue() }
    }

    private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
        val hasRecordTypeValue = stream
            .branch(
                Predicate { _, value -> value.hasRecordType() },
                Predicate { _, _ -> true }
            )

        val noRecordTypeValue = hasRecordTypeValue[1]

        noRecordTypeValue
            .mapValues { _ -> "ERROR: No record type value specified!" }
            .to(settings.outputTopic)

        noRecordTypeValue
            .mapValues { key, value -> value.failureReport(key).toJson() }
            .to(reportTopic)

        return hasRecordTypeValue[0]
    }
Jonas Waeber's avatar
Jonas Waeber committed
138

Jonas Waeber's avatar
Jonas Waeber committed
139
140
141
142
143
144
145
146
147
    private fun parseJsonFile(message: String): List<Map<String, String>> {
        Klaxon().parse<Map<String, String>>(message).let {
            return if (it != null) {
                listOf(it)
            } else {
                emptyList()
            }
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
148
149


Jonas Waeber's avatar
Jonas Waeber committed
150
}