Commit 75b48c49 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Improve error handling

parent 97d7e739
Pipeline #16466 passed with stages
in 5 minutes and 28 seconds
......@@ -25,7 +25,6 @@ import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import java.io.StringReader
import java.lang.ClassCastException
class KafkaTopology(
private val settings: SettingsLoader
......@@ -39,15 +38,24 @@ class KafkaTopology(
.transformValues(HeaderExtractionTransformSupplier<String>())
.flatMapValues { value ->
try {
listOf(klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) })
klaxon.parse<Report>(StringReader(value.first)).let {
if (it != null) {
listOf(IndexReport(it, value.second))
} else {
emptyList()
}
}
} catch (ex: ClassCastException) {
log.error(ex.localizedMessage + " in " + value.first)
emptyList<IndexReport>()
} catch (ex: KlaxonException) {
log.error(ex.localizedMessage + " in " + value.first)
emptyList<IndexReport>()
} catch (ex: NullPointerException) {
log.error(ex.localizedMessage + " in " + value.first)
emptyList<IndexReport>()
}
}
}
.map { key, value -> KeyValue("$key-${value?.report?.step}", klaxon.toJsonString(value)) }
.to(settings.outputTopic)
......
......@@ -23,7 +23,7 @@ import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("ReportProcessorService")
private val settings = SettingsLoader(
val settings = SettingsLoader(
listOf(),
file,
useStreamsConfig = true
......
import ch.memobase.KafkaTopology
import ch.memobase.Service
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.assertj.core.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import java.io.File
import java.nio.charset.Charset
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestProcessing {
private val resourcePath = "src/test/resources"
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
@Test
fun `test processor`() {
val service = Service("app.yml")
val testDriver = TopologyTestDriver(KafkaTopology(service.settings).prepare().build(), service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
service.settings.inputTopic, "identifier-key", readFile("input.json")
)
)
val record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
Assertions.assertThat(record.value())
.isEqualTo(readFile("output.json"))
}
}
\ No newline at end of file
kafka:
streams:
bootstrap.servers: localhast:12345
application.id: "test-app"
topic:
in: "in"
out: "out"
process: "process"
\ No newline at end of file
{
"id": "https://memobase.ch/record/SRF-Kultur-BS_MG_K_09835",
"message": "Ingested resource https://memobase.ch/record/SRF-Kultur-BS_MG_K_09835.",
"status": "SUCCESS",
"step": "fedora-ingest",
"timestamp": "2020-11-02T10:39:03.746"
}
\ No newline at end of file
{"metadata" : {"institutionId" : "PLACEHOLDER", "isPublished" : false, "recordSetId" : "PLACEHOLDER", "sessionId" : "PLACEHOLDER", "tableHeaderCount" : 1, "tableHeaderIndex" : 1, "tableIdentifierIndex" : 1, "tableSheetIndex" : 1, "xmlIdentifierFieldName" : "id", "xmlRecordTag" : "record"}, "report" : {"id" : "https://memobase.ch/record/SRF-Kultur-BS_MG_K_09835", "message" : "Ingested resource https://memobase.ch/record/SRF-Kultur-BS_MG_K_09835.", "status" : "SUCCESS", "step" : "fedora-ingest", "timestamp" : "2020-11-02T10:39:03.746"}}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment