/*
* Copyright (C) 2019 Memobase
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package org.memobase
import ch.memobase.rdf.NS
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import com.beust.klaxon.Klaxon
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.util.stream.Stream
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.params.IntegrationTestParams
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class IntegrationTests {
private val klaxon = Klaxon()
private val resourcePath = "src/test/resources"
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
private val regex = Regex("(_:B[A-Za-z0-9]+)")
private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")
private fun sort(source: List): String {
return source.map {
var replacedString = it
for (matchResult in regex.findAll(it)) {
replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
}
for (matchResult in regexTime.findAll(it)) {
replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
}
replacedString
}.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
}
@ParameterizedTest
@MethodSource("kafkaTests")
fun `test kafka topology`(params: IntegrationTestParams) {
val service = Service("kafkaTest${params.count}.yml")
val output = readFile("kafkaTests/${params.count}/output.nt").lines()
val settings = service.settings
val headers = RecordHeaders()
headers.add(RecordHeader("sessionId", "1".toByteArray()))
headers.add(RecordHeader("recordSetId", "ati-002".toByteArray()))
headers.add(RecordHeader("institutionId", "ati".toByteArray()))
headers.add(RecordHeader("isPublished", "false".toByteArray()))
headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))
val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
"mb-di-config-test",
"ati-002#mapping",
readFile("kafkaTests/${params.count}/config/mapping.yml")
)
)
testDriver.pipeInput(
factory.create(
settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json"), headers
)
)
val reportRecord = testDriver.readOutput(
service.settings.processReportTopic,
StringDeserializer(),
StringDeserializer()
)
val reportString = reportRecord.value()
val report = klaxon.parse(reportString)!!
if (report.status == ReportStatus.fatal) {
assertThat(report)
.isEqualTo(params.report)
return
}
val record = testDriver.readOutput(
settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
val model = ModelFactory.createDefaultModel()
NS.prefixMapping.map {
model.setNsPrefix(it.key, it.value)
}
RDFDataMgr.read(model, record.value().byteInputStream(), Lang.NTRIPLES)
RDFDataMgr.write(
FileOutputStream("$resourcePath/kafkaTests/${params.count}/turtle-output.ttl"),
model,
RDFFormat.TURTLE_PRETTY
)
val key = record.key()
val sortedResult = sort(record.value().lines()).trim()
assertAll("",
{ assertThat(key).isEqualTo(params.expectedKey) },
{
assertThat(sortedResult)
.isEqualTo(sort(output).trim())
},
{
assertThat(Klaxon().parse(reportString))
.isEqualTo(params.report)
}
)
}
private fun kafkaTests() = Stream.of(
IntegrationTestParams(
1,
"MEI_49884",
"https://memobase.ch/record/test-record-set-id-MEI_49884",
Report(
"MEI_49884",
"FATAL",
"No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.",
Service.step
)
)/*,
IntegrationTestParams(
2,
"Sig Han 1293",
"https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Report(
"https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293."
)
),
IntegrationTestParams(
3,
"Sig Han 1293",
"ttps://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
Report(
"https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293."
)
),
IntegrationTestParams(
4,
"InputKey",
"https://memobase.ch/record/test-record-set-id-sigantur-example",
Report(
"https://memobase.ch/record/test-record-set-id-sigantur-example",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-sigantur-example."
)
),
IntegrationTestParams(
5,
"ID_1",
"https://memobase.ch/record/test-record-set-id-ID_1",
Report(
"https://memobase.ch/record/test-record-set-id-ID_1",
"SUCCESS",
"",
Service.step
)
),
IntegrationTestParams(
6,
"ID_1",
"https://memobase.ch/record/test-record-set-id-ID_1",
Report(
"https://memobase.ch/record/test-record-set-id-ID_1",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
)
),
IntegrationTestParams(
7,
"ID_1",
"https://memobase.ch/record/test-record-set-id-ID_1",
Report(
"https://memobase.ch/record/test-record-set-id-ID_1",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
)
),
IntegrationTestParams(
8,
"ID_1",
"https://memobase.ch/record/test-record-set-id-ID_1",
Report(
"https://memobase.ch/record/test-record-set-id-ID_1",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
)
),
IntegrationTestParams(
9,
"ID_1",
"https://memobase.ch/record/test-record-set-id-ID_1",
Report(
"https://memobase.ch/record/test-record-set-id-ID_1",
"SUCCESS",
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
)
)*/
)
}