/* * Copyright (C) 2020-2021 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase import ch.memobase.reporting.Report import ch.memobase.settings.HeaderExtractionTransformSupplier import ch.memobase.settings.SettingsLoader import com.beust.klaxon.Klaxon import com.beust.klaxon.KlaxonException import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.logging.log4j.LogManager import java.io.StringReader class KafkaTopology( private val settings: SettingsLoader ) { private val klaxon = Klaxon() private val log = LogManager.getLogger(this::class.java) fun prepare(): StreamsBuilder { val builder = StreamsBuilder() builder.stream(settings.inputTopic) .transformValues(HeaderExtractionTransformSupplier()) .flatMapValues { value -> try { klaxon.parse(StringReader(value.first)).let { if (it != null) { listOf(IndexReport(it, value.second)) } else { emptyList() } } } catch (ex: ClassCastException) { log.error(ex.localizedMessage + " in " + value.first) emptyList() } catch (ex: KlaxonException) { log.error(ex.localizedMessage + " in " + value.first) emptyList() } catch (ex: NullPointerException) { log.error(ex.localizedMessage + " in " + value.first) emptyList() } } .map { key, value -> KeyValue( "$key-${value.report.step}-${value.metadata.sessionId}", klaxon.toJsonString(value) ) } .to(settings.outputTopic) return builder } }