Verified Commit 89fc8f04 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

catch parsing exceptions

parent be2468f7
Pipeline #25939 passed with stages
in 4 minutes and 11 seconds
...@@ -40,6 +40,7 @@ import org.apache.kafka.streams.StreamsBuilder ...@@ -40,6 +40,7 @@ import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.snakeyaml.engine.v2.exceptions.ScannerException
class KafkaTopology( class KafkaTopology(
private val settings: SettingsLoader private val settings: SettingsLoader
...@@ -271,6 +272,12 @@ class KafkaTopology( ...@@ -271,6 +272,12 @@ class KafkaTopology(
null, null,
Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", Service.step) Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", Service.step)
) )
} catch (ex: IllegalArgumentException) {
log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
} catch (ex: ScannerException) {
log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
} }
} }
} }
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
package org.memobase package org.memobase
import ch.memobase.settings.SettingsLoader
import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import ch.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") { class Service(file: String = "app.yml") {
companion object { companion object {
......
...@@ -20,6 +20,10 @@ import ch.memobase.rdf.NS ...@@ -20,6 +20,10 @@ import ch.memobase.rdf.NS
import ch.memobase.reporting.Report import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus import ch.memobase.reporting.ReportStatus
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.util.stream.Stream
import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
...@@ -36,10 +40,6 @@ import org.junit.jupiter.api.assertAll ...@@ -36,10 +40,6 @@ import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource import org.junit.jupiter.params.provider.MethodSource
import org.memobase.params.IntegrationTestParams import org.memobase.params.IntegrationTestParams
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
class IntegrationTests { class IntegrationTests {
...@@ -86,13 +86,11 @@ class IntegrationTests { ...@@ -86,13 +86,11 @@ class IntegrationTests {
headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray())) headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray())) headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))
val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings) val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory( val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer() StringSerializer(), StringSerializer()
) )
testDriver.pipeInput( testDriver.pipeInput(
factory.create( factory.create(
"mb-di-config-test", "mb-di-config-test",
...@@ -156,17 +154,17 @@ class IntegrationTests { ...@@ -156,17 +154,17 @@ class IntegrationTests {
private fun kafkaTests() = Stream.of( private fun kafkaTests() = Stream.of(
IntegrationTestParams( IntegrationTestParams(
1, 1,
"MEI_49884", "MEI_49884",
"https://memobase.ch/record/test-record-set-id-MEI_49884", "https://memobase.ch/record/test-record-set-id-MEI_49884",
Report( Report(
"MEI_49884", "MEI_49884",
"FATAL", "FATAL",
"No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.", "No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.",
Service.step Service.step
) )
)/*, )/*,
IntegrationTestParams( IntegrationTestParams(
2, 2,
"Sig Han 1293", "Sig Han 1293",
......
...@@ -25,4 +25,4 @@ data class IntegrationTestParams( ...@@ -25,4 +25,4 @@ data class IntegrationTestParams(
val key: String, val key: String,
val expectedKey: String, val expectedKey: String,
val report: Report val report: Report
) )
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment