Commit dab74e47 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Adapt to Kafka 2.7

parent e670c321
......@@ -21,12 +21,12 @@ targetCompatibility = 1.8
repositories {
jcenter()
maven {
url "https://dl.bintray.com/memoriav/memobase"
url "https://gitlab.switch.ch/api/v4/projects/1324/packages/maven"
}
}
ext {
kafkaV = '2.3.1'
kafkaV = '2.7.0'
log4jV = '2.11.2'
}
......@@ -37,8 +37,8 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:2.0.15'
implementation 'ch.memobase:memobase-kafka-utils:0.3.1'
implementation 'org.memobase:memobase-service-utilities:3.0.0'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
......
......@@ -19,6 +19,7 @@
package org.memobase
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.errors.KafkaUtilsException
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.reporting.Report
......@@ -75,23 +76,25 @@ class KafkaTopology(private val settings: SettingsLoader) {
val configNullBranch = joinedStream
.branch(
Predicate { _, value -> value.right == null },
Predicate { _, _ -> true }
Predicate { _, value -> value.hasException() },
Predicate { _, value -> value.hasValue() }
)
configNullBranch[0]
.transformValues(HeaderExtractionTransformSupplier<JoinedValues<Message, ByteArray>>())
.mapValues { value -> value.exception }
.transformValues(HeaderExtractionTransformSupplier<KafkaUtilsException>())
.mapValues { key, value ->
Report(
key,
ReportStatus.fatal,
"Could not find a matching xslt configuration for record set ${value.second.recordSetId}.",
"Could not find a matching xslt configuration for record set ${value.second.recordSetId} because ${value.first.localizedMessage}.",
Service.name
).toJson()
}
.to(reportingTopic)
val saxHandlerStream = configNullBranch[1]
.mapValues { value -> value.value }
.mapValues { value ->
Input(value.left, value.right)
}
......
......@@ -42,7 +42,7 @@ class Service(file: String = "app.yml") {
fun run() {
stream.use {
it.start()
while (stream.state().isRunning) {
while (stream.state().isRunningOrRebalancing) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
......
......@@ -31,9 +31,9 @@ import org.memobase.xml.XMLTransformer
class LocalTestRun {
private val headerMetadata = HeaderMetadata(
"ati-002",
"soz-007",
"1",
"ati",
"soz",
false,
"record",
"identifierMain",
......@@ -43,9 +43,9 @@ class LocalTestRun {
@Test
@Disabled
fun `test local folder`() {
val folder = "/home/jonas/memobase/data/ati-002"
val inputFolder = "/home/jonas/memobase/data/test-ati/step-1"
val outputFolder = "/home/jonas/memobase/data/test-ati/step-2"
val folder = "/home/jonas/memobase/data/soz-007"
val inputFolder = "/home/jonas/memobase/data/test-soz/step-1"
val outputFolder = "/home/jonas/memobase/data/test-soz/step-2"
val transformFile = "/config/transform.xslt"
val xslt = File(folder + transformFile).readBytes()
val transformer = XMLTransformer()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment