package org.memobase import ch.memobase.reporting.Report import ch.memobase.testing.EmbeddedSftpServer import com.beust.klaxon.Klaxon import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.TestOutputTopic import org.apache.kafka.streams.TopologyTestDriver import org.apache.kafka.streams.test.TestRecord import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.assertAll import java.io.File import java.nio.charset.Charset @TestInstance(TestInstance.Lifecycle.PER_CLASS) class TestPipeline { private val resourcePath = "src/test/resources/data" private fun readFile(fileName: String, count: Int): String { return File("$resourcePath/$count/$fileName").readText(Charset.defaultCharset()) } private val klaxon = Klaxon() private val regex = Regex("(_:B[A-Za-z0-9]+)") private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}") private fun sort(source: String): String { return source.lines().map { var replacedString = it for (matchResult in regex.findAll(it)) { replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B") } for (matchResult in regexTime.findAll(it)) { replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22") } replacedString }.sorted().reduce { acc, s -> acc + "\n" + s }.trim() } private fun setupTest( count: Int, inputKey: String ): Pair, TestOutputTopic> { val service = Service("data/$count/app.yml") val testDriver = TopologyTestDriver(KafkaTopology(service.settings).prepare().build(), service.settings.kafkaStreamsSettings) val inputValue = readFile("input.json", count) val headers = RecordHeaders() headers.add(RecordHeader("sessionId", "test-session-id".toByteArray())) headers.add(RecordHeader("recordSetId", "rst-001".toByteArray())) headers.add(RecordHeader("institutionId", "ins".toByteArray())) val inputTopic = testDriver.createInputTopic(service.settings.inputTopic, StringSerializer(), StringSerializer()) val inputRecord = TestRecord(inputKey, inputValue, headers) inputTopic.pipeInput(inputRecord) val configTopic = testDriver.createInputTopic( service.settings.appSettings.getProperty("configTopic"), StringSerializer(), StringSerializer() ) configTopic.pipeInput("rst-001#mapping", readFile("mapping.yml", count)) val outputTopic = testDriver.createOutputTopic(service.settings.outputTopic, StringDeserializer(), StringDeserializer()) val outputReportTopic = testDriver.createOutputTopic( service.settings.processReportTopic, StringDeserializer(), StringDeserializer() ) return Pair(outputTopic, outputReportTopic) } private fun parseReport(data: String): Report { return klaxon.parse(data)!! } @Test fun `test 1 minimal configuration`() { val topics = setupTest(1, "test-1") while (topics.first.isEmpty) { } assertThat(topics.second.isEmpty) .withFailMessage("Reports topic is empty.") .isFalse() val record = topics.first.readRecord() val recordValue = record.value val recordKey = record.key assertAll( { assertThat(sort(recordValue)) .isEqualTo(sort(readFile("output.nt", 1))) }, { assertThat(recordKey) .isEqualTo("https://memobase.ch/record/rst-001-1") } ) } @Test fun `test 2 minimal configuration with digital and physical object`() { val topics = setupTest(2, "test-2") while (topics.first.isEmpty) { } assertThat(topics.second.isEmpty) .withFailMessage("Reports topic is empty.") .isFalse() val record = topics.first.readRecord() val recordValue = record.value val recordKey = record.key assertAll( { assertThat(sort(recordValue)) .isEqualTo(sort(readFile("output.nt", 2))) }, { assertThat(recordKey) .isEqualTo("https://memobase.ch/record/rst-001-1") } ) } @Test fun `test 3 test list input`() { val topics = setupTest(3, "test-3") while (topics.first.isEmpty) { } assertThat(topics.second.isEmpty) .withFailMessage("Reports topic is empty.") .isFalse() val report = topics.second.readRecord() val reportKey = report.key val reportValue = report.value assertAll( "Assert reports.", { assertThat(reportKey) .isEqualTo("https://memobase.ch/record/rst-001-22861") }, { assertThat(parseReport(reportValue)) .isEqualTo(Report("https://memobase.ch/record/rst-001-22861", "SUCCESS", "Generated RDF document.", "test")) } ) assertThat(topics.first.isEmpty) .withFailMessage("Record Topic is empty") .isFalse() val record = topics.first.readRecord() val recordValue = record.value val recordKey = record.key assertAll( { assertThat(sort(recordValue)) .isEqualTo(sort(readFile("output.nt", 3))) }, { assertThat(recordKey) .isEqualTo("https://memobase.ch/record/rst-001-22861") } ) } @Test fun `test 4 if proxy field in mapping works`() { val topics = setupTest(4, "test-4") while (topics.first.isEmpty) { } assertThat(topics.second.isEmpty) .withFailMessage("Reports topic is empty.") .isFalse() val report = topics.second.readRecord() val reportKey = report.key val reportValue = report.value val record = topics.first.readRecord() val recordValue = record.value val recordKey = record.key assertAll( { assertThat(sort(recordValue)) .isEqualTo(sort(readFile("output.nt", 4))) }, { assertThat(recordKey) .isEqualTo("https://memobase.ch/record/rst-001-1") } ) } }