Commit 7e21ebb6 authored by Jonas Waeber's avatar Jonas Waeber

Add test and fix some minor issues

parent 41d7f0b2
Pipeline #12337 passed with stages
in 6 minutes
...@@ -104,7 +104,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -104,7 +104,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
val report = value.getReport() val report = value.getReport()
if (report.status == ReportStatus.success) { if (report.status == ReportStatus.success) {
ProcessReport( ProcessReport(
report.id, "xml-data-transform",
ReportStatus.success, ReportStatus.success,
1, 1,
1, 1,
...@@ -112,7 +112,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -112,7 +112,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
) )
} else { } else {
ProcessReport( ProcessReport(
report.id, "xml-data-transform",
ReportStatus.failure, ReportStatus.failure,
1, 1,
0, 0,
......
...@@ -128,28 +128,32 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr ...@@ -128,28 +128,32 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr
if (recordTag == localName) if (recordTag == localName)
return return
if (currentElementTag == localName) { when {
if (currentElementTag == identifierFieldName) { currentElementTag == localName -> {
identifier = currentElementContent if (currentElementTag == identifierFieldName) {
identifier = currentElementContent
}
if (innerElements.isEmpty()) {
if (currentElementContent.isNotEmpty()) {
jsonResult[currentElementTag] = currentElementContent
}
} else {
jsonResult[currentElementTag] = innerElements.toMap()
innerElements.clear()
}
currentElementTag = ""
currentElementContent = ""
} }
if (innerElements.isEmpty()) { currentInnerElementTag == localName -> {
if (currentElementContent.isNotEmpty()) { if (currentInnerElementContent.isNotEmpty()) {
jsonResult[currentElementTag] = currentElementContent innerElements.add(Pair(currentInnerElementTag, currentInnerElementContent))
} }
} else { currentInnerElementTag = ""
jsonResult[currentElementTag] = innerElements.toMap() currentInnerElementContent = ""
innerElements.clear()
} }
currentElementTag = "" else -> {
currentElementContent = "" reportText += "Unmatched end element: $localName.\n"
} else if (currentInnerElementTag == localName) {
if (currentInnerElementContent.isNotEmpty()) {
innerElements.add(Pair(currentInnerElementTag, currentInnerElementContent))
} }
currentInnerElementTag = ""
currentInnerElementContent = ""
} else {
reportText += "Unmatched end element: $localName.\n"
} }
} }
......
...@@ -18,11 +18,107 @@ ...@@ -18,11 +18,107 @@
package org.memobase package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.testing.EmbeddedSftpServer
import java.io.File
import java.io.FileInputStream
import java.nio.charset.Charset
import java.nio.file.Paths
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestIntegration { class TestIntegration {
private val log = LogManager.getLogger("TestLogger")
private val resourcePath = "src/test/resources/data"
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
@ParameterizedTest
@MethodSource("integrationTestParams")
fun `test kafka integration single record`(params: TestParams) {
sftpServer.putFile(
"/memobase/test${params.count}/data.xml",
FileInputStream("$resourcePath/${params.count}/data.xml")
)
val service = Service("test${params.count}.yml")
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
service.settings.inputTopic, params.inputKey, readFile("${params.count}/input.json")
)
)
val record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
val reportedRecord = testDriver.readOutput(
"${service.settings.outputTopic}-reporting",
StringDeserializer(),
StringDeserializer()
)
val data = reportedRecord.value()
val report = Klaxon().parse<Report>(data)
val processReportRecord = testDriver.readOutput(
service.settings.processReportTopic,
StringDeserializer(),
StringDeserializer()
)
val processReport = Klaxon().parse<ProcessReport>(processReportRecord.value())
assertAll("check single record transform",
{
assertThat(record)
.isNotNull
.hasFieldOrPropertyWithValue("key", params.outputKey)
.hasFieldOrPropertyWithValue("value", readFile("${params.count}/output.json"))
},
{
assertThat(report)
.isNotNull
.isEqualTo(params.expectedOutputReport)
},
{
assertThat(processReport)
.isNotNull
.isEqualTo(params.expectedOutputProcessReport)
}
)
}
fun integrationTestParams() = Stream.of(
TestParams(
1,
"simple test",
"data.xml",
"ADG-102683",
Report("ADG-102683", ReportStatus.success, "Successfully transformed xml to json!"),
ProcessReport("xml-data-transform", ReportStatus.success, 1, 1, 0)
)
)
} }
\ No newline at end of file
This diff is collapsed.
{ {
"path": "/memobase/test1/data.xml", "path": "/memobase/test1/data.xml",
"format": "ERROR" "format": "XML"
} }
\ No newline at end of file
ERROR {"title":"Ausgrabung Crestaulta","SerieTitle":"Grabung Walo Burkart und Karl Keller-Tarnuzzer, Lumbrein-Surin (Crestaulta)","creatorPerson":{"name":"Karl Keller-Tarnuzzer"},"Keywords":"Ausgrabung, Archäologie","Abstract":"Herdstelle in Fläche D, 2. Situation (mittlere Schicht)","Context":"Verwandte Dokumente: RM_1_1_7","Claim":"Dieses Dokument wurde Dank der Unterstützung von Memoriav erhalten.","RecordingLocation":"Lumbrein-Surin, Crestaulta","contributorCorporateBody":{"name":"ADG"},"genre":"Grabungsfotografie / Arbeitsfotografie / Sachfotografie","objectType":"photograph","medium":"[keine Information vorhanden]","imageFormatColorMode":"sw","imageFormatRemarks":"Album-Nr.: A4","identifierOriginal":"102683","identifierCallNumber":"RM_1_3_10","identifierMain":"ADG-102683","location":"Schweiz, Graubünden, Lumbrein-Surin, Crestaulta","rightsHolder":"Archäologischer Dienst Graubünden","accessPhsyical":"onsite","accessDigital":"public"}
\ No newline at end of file \ No newline at end of file
...@@ -5,8 +5,8 @@ sftp: ...@@ -5,8 +5,8 @@ sftp:
password: password password: password
app: app:
xsltFilePath: src/test/resources/data/1/config/transform.xslt xsltFilePath: src/test/resources/data/1/config/transform.xslt
identifierFieldName: id identifierFieldName: identifierMain
recordTag: root recordTag: record
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment