KafkaTopology.kt 2.41 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
2
 * Table Data Import Service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
24
import org.apache.kafka.streams.kstream.KStream
25
import org.memobase.models.*
Jonas Waeber's avatar
Jonas Waeber committed
26
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
27

Jonas Waeber's avatar
Jonas Waeber committed
28
class KafkaTopology(private val settings: SettingsLoader) {
Jonas Waeber's avatar
Jonas Waeber committed
29
    private val parser = TableParser(settings)
Jonas Waeber's avatar
Jonas Waeber committed
30

Jonas Waeber's avatar
Jonas Waeber committed
31
32
    private val reportingTopic = settings.outputTopic + "-reporting"

Jonas Waeber's avatar
Jonas Waeber committed
33
    private val klaxon = Klaxon()
34
    private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls, Formats.ods)
Jonas Waeber's avatar
Jonas Waeber committed
35

36
    fun prepare(): StreamsBuilder {
Jonas Waeber's avatar
Jonas Waeber committed
37
38
        val builder = StreamsBuilder()

39
40
41
42
43
44
        val parsedTable = builder
            .stream<String, String>(settings.inputTopic)
            .mapValues { value -> klaxon.parse<Message>(value) }
            .filter { _, value -> acceptedFormats.contains(value?.format) }
            .mapValues { key, value -> parser.parseTable(key, value!!) }

45

Jonas Waeber's avatar
Jonas Waeber committed
46

Jonas Waeber's avatar
Jonas Waeber committed
47

48
        return builder
Jonas Waeber's avatar
Jonas Waeber committed
49
    }
Jonas Waeber's avatar
Jonas Waeber committed
50

Jonas Waeber's avatar
Jonas Waeber committed
51
    private fun processValidInput(stream: KStream<String, ParserResult>) {
Jonas Waeber's avatar
Jonas Waeber committed
52
        val records = stream
53
            .flatMapValues { _, value -> value.messages }
Jonas Waeber's avatar
Jonas Waeber committed
54
55

        records
56
57
58
59
            .map { _, value -> KeyValue(value.key, value.value) }
            .filter { _, value -> value.isNotEmpty() }
            .mapValues { value -> value.toJsonString() }
            .to(settings.outputTopic)
60

Jonas Waeber's avatar
Jonas Waeber committed
61
        records
62
63
64
            .map { _, value -> KeyValue(value.report.id, value.report) }
            .mapValues { value -> value.toJson() }
            .to(reportingTopic)
Jonas Waeber's avatar
Jonas Waeber committed
65

Jonas Waeber's avatar
Jonas Waeber committed
66
        stream
67
68
            .map { _, value -> KeyValue(value.processReport.id, value.processReport.toJson()) }
            .to(settings.processReportTopic)
Jonas Waeber's avatar
Jonas Waeber committed
69
    }
Jonas Waeber's avatar
Jonas Waeber committed
70
}