Service.kt 2.78 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/*
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

import ch.memobase.settings.SettingsLoader
import com.beust.klaxon.Klaxon
import org.apache.logging.log4j.LogManager

class Service(file: String = "app.yml") {
    companion object {
        const val importApiUrlSettingName = "importApiUrl"
        const val whitelistFilePathSettingName = "whitelistFilePath"
    }

    private val log = LogManager.getLogger("GroupReportsConsumerService")

    private val settings = SettingsLoader(
        listOf(
            importApiUrlSettingName,
            whitelistFilePathSettingName
        ),
        file,
        useConsumerConfig = true,
        useProducerConfig = true
    )

    private val klaxon = Klaxon()
    private val consumer = Consumer(settings.inputTopic, settings.kafkaConsumerSettings)
    private val producer = Producer(settings.outputTopic, settings.kafkaProducerSettings)
    private val importApiUrl = settings.appSettings[importApiUrlSettingName] as String
    private val stepWhitelist = StepWhitelist(settings.appSettings[whitelistFilePathSettingName] as String)

    val collector = ReportCollector(importApiUrl)

    fun run() {
        while (true) {
            consumer.consume()
                .mapNotNull { klaxon.parse<IndexReport>(it.value()) }
                .filter { stepWhitelist.check(it.report.step) }
                .forEach {
Jonas Waeber's avatar
Jonas Waeber committed
55
                    log.info("Processing report: " + it.report.id + "; " + it.report.step)
Jonas Waeber's avatar
Jonas Waeber committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
                    if (it.metadata.institutionId == "none") {
                        collector.addReport(
                            it.metadata.recordSetId,
                            Type.record_set,
                            it.metadata.sessionId,
                            it.report
                        )
                    } else {
                        collector.addReport(
                            it.metadata.institutionId,
                            Type.institution,
                            it.metadata.sessionId,
                            it.report
                        )
                    }

                }
            collector.check()
        }
    }
}