/* * Drupal Sync Service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import com.beust.klaxon.Klaxon import com.beust.klaxon.KlaxonException import java.io.StringReader import org.apache.jena.rdf.model.Model import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.logging.log4j.LogManager import org.memobase.model.Input import org.memobase.model.Institution import org.memobase.model.RecordSet import org.memobase.settings.SettingsLoader class KafkaTopology(private val settings: SettingsLoader) { private val log = LogManager.getLogger("DrupalSyncProcessor") private val transformer = Transform() fun build(): Topology { val builder = StreamsBuilder() val stream = builder.stream(settings.inputTopic) stream .flatMapValues { value -> parseJson(value) } .flatMapValues { value -> transformJson(value) } .map { _, value -> transformer.write(value.first, value.second) } .to(settings.outputTopic) return builder.build() } private fun parseJson(data: String): List { return try { val int = Klaxon().parse(StringReader(data)) return if (int == null) emptyList() else listOf(int) } catch (ex: KlaxonException) { log.error(ex.localizedMessage) emptyList() } catch (ex: ClassCastException) { log.error(ex.localizedMessage) emptyList() } } private fun transformJson(input: Input): List> { return when (input) { is Institution -> listOf(transformer.createInstitution(input)) is RecordSet -> listOf(transformer.createRecordSet(input)) else -> { log.error("Could not cast input to institution or record set: $input.") return emptyList() } } } }