/* * Drupal Sync Service * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import org.apache.jena.rdf.model.Model import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.logging.log4j.LogManager import org.memobase.model.Input import org.memobase.model.Institution import org.memobase.model.RecordSet import org.memobase.settings.SettingsLoader class KafkaTopology(private val settings: SettingsLoader) { private val log = LogManager.getLogger("DrupalSyncProcessor") private val transformer = RdfTransformer(settings.appSettings) fun build(): Topology { val builder = StreamsBuilder() val stream = builder.stream(settings.inputTopic) stream .flatMapValues { value -> JSON.parseJson(value) } .flatMapValues { value -> mapJson(value) } .map { _, value -> Util.writeModel(value.first, value.second) } .to(settings.outputTopic) return builder.build() } private fun mapJson(input: Input): List> { return when (input) { is Institution -> listOf(transformer.createInstitution(input)) is RecordSet -> listOf(transformer.createRecordSet(input)) else -> { log.error("Could not cast input to institution or record set: $input.") return emptyList() } } } }