Commit 94765380 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fix implementation & tests

parent 52b1748a
...@@ -54,18 +54,23 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -54,18 +54,23 @@ class KafkaTopology(private val settings: SettingsLoader) {
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val configStream = builder.stream<ByteArray, ByteArray>("import-process-config") val configStream = builder.stream<String, String>("import-process-config")
.map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
val dataStream = builder val dataStream = builder
.stream<String, String>(settings.inputTopic) .stream<String, String>(settings.inputTopic)
.flatMapValues { _, value -> parseMessage(value) } .flatMapValues { _, value -> parseMessage(value) }
.filter { _, value -> value.format != Formats.xml } .filter { _, value ->
value.format == Formats.xml
}
val joinedStream = configJoiner.join(dataStream, configStream) val joinedStream = configJoiner.join(dataStream, configStream)
val saxHandlerStream = joinedStream val saxHandlerStream = joinedStream
.mapValues { value -> Input(value.left, value.right) } .mapValues { value ->
log.debug("Combine joined inputs.")
Input(value.left, value.right) }
.transformValues(HeaderExtractionTransformSupplier<Input>()) .transformValues(HeaderExtractionTransformSupplier<Input>())
.mapValues { value -> .mapValues { value ->
Content( Content(
......
...@@ -37,7 +37,7 @@ import java.io.StringWriter ...@@ -37,7 +37,7 @@ import java.io.StringWriter
* @param identifierFieldName The field name of the unique identifier of this record. * @param identifierFieldName The field name of the unique identifier of this record.
* @param recordTag The root tag of the xml structure. * @param recordTag The root tag of the xml structure.
*/ */
class SAXContentHandler(key: String, private val identifierFieldName: String, private val recordTag: String) : class SAXContentHandler(private val key: String, private val identifierFieldName: String, private val recordTag: String) :
ContentHandler { ContentHandler {
private val log = LogManager.getLogger("SAXHandler") private val log = LogManager.getLogger("SAXHandler")
...@@ -49,7 +49,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr ...@@ -49,7 +49,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr
/** /**
* The identifier is used as a message key for the outgoing message. * The identifier is used as a message key for the outgoing message.
*/ */
var identifier: String = key var identifier: String? = null
private var report: Report? = null private var report: Report? = null
private val jsonResult = JsonObject() private val jsonResult = JsonObject()
...@@ -60,7 +60,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr ...@@ -60,7 +60,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr
return report.let { return report.let {
it it
?: Report( ?: Report(
identifier, identifier ?: key,
ReportStatus.failure, ReportStatus.failure,
"Unknown Failure: No report found." "Unknown Failure: No report found."
) )
...@@ -182,7 +182,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr ...@@ -182,7 +182,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr
override fun endDocument() { override fun endDocument() {
output.write(jsonResult.toJsonString()) output.write(jsonResult.toJsonString())
report = Report( report = Report(
id = identifier, id = identifier ?: key,
status = if (reportText.isEmpty()) ReportStatus.success else ReportStatus.failure, status = if (reportText.isEmpty()) ReportStatus.success else ReportStatus.failure,
message = if (reportText.isEmpty()) "Successfully transformed xml to json!" else reportText.trim() message = if (reportText.isEmpty()) "Successfully transformed xml to json!" else reportText.trim()
) )
......
...@@ -53,10 +53,10 @@ class XMLTransformer { ...@@ -53,10 +53,10 @@ class XMLTransformer {
transformer.destination = SAXDestination(contentHandler) transformer.destination = SAXDestination(contentHandler)
transformer.transform() transformer.transform()
} }
if (contentHandler.identifier.isEmpty()) { if (contentHandler.identifier == null || contentHandler.identifier?.isEmpty() == true) {
throw MissingIdentifierException(key, headers.xmlIdentifierFieldName) throw MissingIdentifierException(key, headers.xmlIdentifierFieldName)
} else { } else {
return Pair(contentHandler.identifier, contentHandler) return Pair(contentHandler.identifier!!, contentHandler)
} }
} }
} }
\ No newline at end of file
...@@ -65,7 +65,7 @@ class TestIntegration { ...@@ -65,7 +65,7 @@ class TestIntegration {
headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray())) headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray()))
headers.add(RecordHeader("isPublished", "false".toByteArray())) headers.add(RecordHeader("isPublished", "false".toByteArray()))
headers.add(RecordHeader("xmlRecordTag", "record".toByteArray())) headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
headers.add(RecordHeader("xmlIdentifierFieldName", "id".toByteArray())) headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
headers.add(RecordHeader("tableSheetIndex", "1".toByteArray())) headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderCount", "1".toByteArray())) headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray())) headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
...@@ -73,28 +73,31 @@ class TestIntegration { ...@@ -73,28 +73,31 @@ class TestIntegration {
val service = Service("test${params.count}.yml") val service = Service("test${params.count}.yml")
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings) val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
val factoryConfig = ConsumerRecordFactory(
StringSerializer(), StringSerializer() StringSerializer(), StringSerializer()
) )
testDriver.pipeInput( testDriver.pipeInput(
factory.create( factoryConfig.create(
service.settings.inputTopic, params.inputKey, readFile("${params.count}/input.json"), "import-process-config",
headers "test-record-set-id#transform",
readFile("${params.count}/config/transform.xslt")
) )
) )
val factoryConfig = ConsumerRecordFactory(
ByteArraySerializer(), ByteArraySerializer()
)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput( testDriver.pipeInput(
factoryConfig.create( factory.create(
"import-process-config", service.settings.inputTopic, params.inputKey, readFile("${params.count}/input.json"),
"test-record-set-id#transform".toByteArray(), headers
readFile("${params.count}/config/transform.xslt").toByteArray()
) )
) )
val record = testDriver.readOutput( val record = testDriver.readOutput(
service.settings.outputTopic, service.settings.outputTopic,
StringDeserializer(), StringDeserializer(),
......
{"title":"Ausgrabung Crestaulta","SerieTitle":"Grabung Walo Burkart und Karl Keller-Tarnuzzer, Lumbrein-Surin (Crestaulta)","creatorPerson":{"name":"Karl Keller-Tarnuzzer"},"Keywords":"Ausgrabung, Archäologie","Abstract":"Herdstelle in Fläche D, 2. Situation (mittlere Schicht)","Context":"Verwandte Dokumente: RM_1_1_7","Claim":"Dieses Dokument wurde Dank der Unterstützung von Memoriav erhalten.","RecordingLocation":"Lumbrein-Surin, Crestaulta","contributorCorporateBody":{"name":"ADG"},"genre":"Grabungsfotografie / Arbeitsfotografie / Sachfotografie","objectType":"photograph","medium":"[keine Information vorhanden]","imageFormatColorMode":"sw","imageFormatRemarks":"Album-Nr.: A4","identifierOriginal":"102683","identifierCallNumber":"RM_1_3_10","identifierMain":"ADG-102683","location":"Schweiz, Graubünden, Lumbrein-Surin, Crestaulta","rightsHolder":"Archäologischer Dienst Graubünden","accessPhsyical":"onsite","accessDigital":"public"} {"title":"Ausgrabung Crestaulta","SerieTitle":"Grabung Walo Burkart und Karl Keller-Tarnuzzer, Lumbrein-Surin (Crestaulta)","creatorPerson":{"name":"Karl Keller-Tarnuzzer","role":"Author"},"Keywords":"Ausgrabung, Archäologie","Abstract":"Herdstelle in Fläche D, 2. Situation (mittlere Schicht)","Context":"Verwandte Dokumente: RM_1_1_7","Claim":"Dieses Dokument wurde Dank der Unterstützung von Memoriav erhalten.","RecordingLocation":"Lumbrein-Surin, Crestaulta","dateCreated":"1937","genre":"Grabungsfotografie / Arbeitsfotografie / Sachfotografie","objectType":"Foto","medium":"[keine Information vorhanden]","imageFormatColorMode":"sw","imageFormatRemarks":"Album-Nr.: A4","identifierOriginal":"102683","identifierCallNumber":"RM_1_3_10","identifierMain":"ADG-102683","temporal":"1937-1938","location":"Schweiz, Graubünden, Lumbrein-Surin, Crestaulta","rightsHolder":"Archäologischer Dienst Graubünden","accessPhysical":"onsite","accessDigital":"public","rightsUsage":"Die Nutzungsrechte müssen mit dem Rechtinhaber geklärt werden"}
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment