Commit 65ae0baa authored by Jonas Waeber's avatar Jonas Waeber
Browse files

merge changes

parents 0fd31e17 89fc8f04
.metals
.idea
!.idea/copyright
......@@ -6,4 +7,4 @@ build
.gradle
output/
data/
\ No newline at end of file
data/
deploymentName: di-mapper-test
kafkaConfigs: test-kafka-bootstrap-servers
applicationId: di-mapper-test-app
inputTopicName: mb-di-mapper-test
outputTopicName: mb-di-media-linker-test
reportingTopicName: mb-di-reporting-test
configTopicName: mb-di-config-test
\ No newline at end of file
inputTopicName: mb-di-mapper-prod
outputTopicName: mb-di-media-linker-prod
reportingTopicName: mb-di-reporting-prod
configTopicName: mb-di-config-prod
......@@ -40,6 +40,7 @@ import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.snakeyaml.engine.v2.exceptions.ScannerException
class KafkaTopology(
private val settings: SettingsLoader
......@@ -272,6 +273,12 @@ class KafkaTopology(
null,
Report(key, ReportStatus.fatal, "There's no data to be processed: ${ex.localizedMessage}", reportingStepName)
)
} catch (ex: IllegalArgumentException) {
log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
} catch (ex: ScannerException) {
log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.fatal, ex.localizedMessage, Service.step))
}
}
}
......@@ -18,9 +18,9 @@
package org.memobase
import ch.memobase.settings.SettingsLoader
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import ch.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
companion object {
......
......@@ -21,6 +21,10 @@ import ch.memobase.rdf.NS
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import com.beust.klaxon.Klaxon
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.util.stream.Stream
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
......@@ -37,10 +41,6 @@ import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.params.IntegrationTestParams
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class IntegrationTests {
......@@ -87,13 +87,11 @@ class IntegrationTests {
headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))
val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
"mb-di-config-test",
......@@ -167,7 +165,7 @@ class IntegrationTests {
"test"
)
)/*,
)/*,
IntegrationTestParams(
2,
"Sig Han 1293",
......
......@@ -25,4 +25,4 @@ data class IntegrationTestParams(
val key: String,
val expectedKey: String,
val report: Report
)
\ No newline at end of file
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment