Commit 4a4252b9 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fix error handling and add a test.

parent f0aafd7b
Pipeline #16105 passed with stages
in 5 minutes and 2 seconds
......@@ -82,8 +82,9 @@ class KafkaTopology(private val settings: SettingsLoader) {
key,
ReportStatus.failure,
"Could not find a matching xslt configuration for record set ${value.second.recordSetId}."
)
).toJson()
}
.to(reportingTopic)
val saxHandlerStream = configNullBranch[1]
.mapValues { value ->
......
......@@ -21,13 +21,14 @@ package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
......@@ -44,6 +45,7 @@ import java.util.stream.Stream
class TestIntegration {
private val log = LogManager.getLogger("TestLogger")
private val klaxon = Klaxon()
private val resourcePath = "src/test/resources/data"
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
......@@ -143,4 +145,56 @@ class TestIntegration {
)
)
@Test
@Disabled
fun `test missing or invalid xslt configuration`() {
val headers = RecordHeaders()
headers.add(RecordHeader("sessionId", "test-session-id".toByteArray()))
headers.add(RecordHeader("recordSetId", "test-record-set-id".toByteArray()))
headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray()))
headers.add(RecordHeader("isPublished", "false".toByteArray()))
headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))
val service = Service("test1.yml")
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
service.settings.inputTopic, "", readFile("1/input.json"),
headers
)
)
val record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
val reportedRecord = testDriver.readOutput(
service.settings.processReportTopic,
StringDeserializer(),
StringDeserializer()
)
val report = reportedRecord.value()
assertAll("",
{ assertThat(record).isNull() },
{
assertThat(klaxon.parse<Report>(report)).isEqualTo(
Report(
"",
ReportStatus.failure,
"Could not find a matching xslt configuration for record set test-record-set-id."
)
)
}
)
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment