/* * Copyright (C) 2019 Memobase * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package org.memobase import ch.memobase.rdf.NS import ch.memobase.reporting.Report import ch.memobase.reporting.ReportStatus import com.beust.klaxon.Klaxon import java.io.File import java.io.FileOutputStream import java.nio.charset.Charset import java.util.stream.Stream import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.riot.Lang import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFFormat import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.streams.TopologyTestDriver import org.apache.kafka.streams.test.ConsumerRecordFactory import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.assertAll import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import org.memobase.params.IntegrationTestParams @TestInstance(TestInstance.Lifecycle.PER_CLASS) class IntegrationTests { private val klaxon = Klaxon() private val resourcePath = "src/test/resources" private fun readFile(fileName: String): String { return File("$resourcePath/$fileName").readText(Charset.defaultCharset()) } private val regex = Regex("(_:B[A-Za-z0-9]+)") private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}") private fun sort(source: List): String { return source.map { var replacedString = it for (matchResult in regex.findAll(it)) { replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B") } for (matchResult in regexTime.findAll(it)) { replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22") } replacedString }.sorted().reduce { acc, s -> acc + "\n" + s }.trim() } @ParameterizedTest @MethodSource("kafkaTests") fun `test kafka topology`(params: IntegrationTestParams) { val service = Service("kafkaTest${params.count}.yml") val output = readFile("kafkaTests/${params.count}/output.nt").lines() val settings = service.settings val headers = RecordHeaders() headers.add(RecordHeader("sessionId", "1".toByteArray())) headers.add(RecordHeader("recordSetId", "ati-002".toByteArray())) headers.add(RecordHeader("institutionId", "ati".toByteArray())) headers.add(RecordHeader("isPublished", "false".toByteArray())) headers.add(RecordHeader("xmlRecordTag", "record".toByteArray())) headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray())) headers.add(RecordHeader("tableSheetIndex", "1".toByteArray())) headers.add(RecordHeader("tableHeaderCount", "1".toByteArray())) headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray())) headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray())) val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings) val factory = ConsumerRecordFactory( StringSerializer(), StringSerializer() ) testDriver.pipeInput( factory.create( "mb-di-config-test", "ati-002#mapping", readFile("kafkaTests/${params.count}/config/mapping.yml") ) ) testDriver.pipeInput( factory.create( settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json"), headers ) ) val reportRecord = testDriver.readOutput( service.settings.processReportTopic, StringDeserializer(), StringDeserializer() ) val reportString = reportRecord.value() val report = klaxon.parse(reportString)!! if (report.status == ReportStatus.fatal) { assertThat(report) .isEqualTo(params.report) return } val record = testDriver.readOutput( settings.outputTopic, StringDeserializer(), StringDeserializer() ) val model = ModelFactory.createDefaultModel() NS.prefixMapping.map { model.setNsPrefix(it.key, it.value) } RDFDataMgr.read(model, record.value().byteInputStream(), Lang.NTRIPLES) RDFDataMgr.write( FileOutputStream("$resourcePath/kafkaTests/${params.count}/turtle-output.ttl"), model, RDFFormat.TURTLE_PRETTY ) val key = record.key() val sortedResult = sort(record.value().lines()).trim() assertAll("", { assertThat(key).isEqualTo(params.expectedKey) }, { assertThat(sortedResult) .isEqualTo(sort(output).trim()) }, { assertThat(Klaxon().parse(reportString)) .isEqualTo(params.report) } ) } private fun kafkaTests() = Stream.of( IntegrationTestParams( 1, "MEI_49884", "https://memobase.ch/record/test-record-set-id-MEI_49884", Report( "MEI_49884", "FATAL", "No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.", Service.step ) )/*, IntegrationTestParams( 2, "Sig Han 1293", "https://memobase.ch/record/test-record-set-id-Sig_Han_1293", Report( "https://memobase.ch/record/test-record-set-id-Sig_Han_1293", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293." ) ), IntegrationTestParams( 3, "Sig Han 1293", "ttps://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293", Report( "https://memobase.ch/record/test-record-set-id-Sig_Han_1293", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293." ) ), IntegrationTestParams( 4, "InputKey", "https://memobase.ch/record/test-record-set-id-sigantur-example", Report( "https://memobase.ch/record/test-record-set-id-sigantur-example", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-sigantur-example." ) ), IntegrationTestParams( 5, "ID_1", "https://memobase.ch/record/test-record-set-id-ID_1", Report( "https://memobase.ch/record/test-record-set-id-ID_1", "SUCCESS", "", Service.step ) ), IntegrationTestParams( 6, "ID_1", "https://memobase.ch/record/test-record-set-id-ID_1", Report( "https://memobase.ch/record/test-record-set-id-ID_1", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1." ) ), IntegrationTestParams( 7, "ID_1", "https://memobase.ch/record/test-record-set-id-ID_1", Report( "https://memobase.ch/record/test-record-set-id-ID_1", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1." ) ), IntegrationTestParams( 8, "ID_1", "https://memobase.ch/record/test-record-set-id-ID_1", Report( "https://memobase.ch/record/test-record-set-id-ID_1", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1." ) ), IntegrationTestParams( 9, "ID_1", "https://memobase.ch/record/test-record-set-id-ID_1", Report( "https://memobase.ch/record/test-record-set-id-ID_1", "SUCCESS", "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1." ) )*/ ) }