Commit 1912f6c6 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update dependencies

Replace bintray with mavenCentral & GitLab
Replace git version plugin
Update reporting to distinguish a step
parent 561da547
Pipeline #25448 passed with stages
in 3 minutes and 46 seconds
......@@ -20,12 +20,11 @@ plugins {
id 'application'
id 'distribution'
id 'org.jetbrains.kotlin.jvm' version '1.3.71'
id 'com.palantir.git-version' version '0.11.0'
id 'com.gitlab.morality.grit' version '2.0.2'
id 'org.jlleitschuh.gradle.ktlint' version '9.2.1'
}
group 'org.memobase'
version = gitVersion()
mainClassName = 'org.memobase.App'
jar {
......@@ -38,9 +37,9 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
jcenter()
mavenCentral()
maven {
url "https://gitlab.switch.ch/api/v4/projects/1324/packages/maven"
url "https://gitlab.switch.ch/api/v4/projects/1324/packages/maven"
}
}
......
......@@ -25,7 +25,6 @@ import ch.memobase.reporting.Report
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
import java.nio.charset.StandardCharsets
import org.apache.jena.rdf.model.Model
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
......@@ -38,6 +37,7 @@ import org.memobase.model.MemobaseModel
import org.memobase.model.ProcessResult
import org.memobase.model.ProcessStatus
import org.memobase.model.Transaction
import java.nio.charset.StandardCharsets
class KafkaTopology(
private val settings: SettingsLoader
......@@ -52,16 +52,8 @@ class KafkaTopology(
val processOutcome = stream
.transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value -> parseNtriples(value) }
// TODO: Implement probably different workflow for json-record-sets and json-institutions
.mapValues { value -> addEventType(value) }
.mapValues { value -> writeJsonLd(value) }
.branch(
Predicate { _, value -> value.status == ProcessStatus.SUCCESS },
Predicate { _, _ -> true }
)
val outputData = processOutcome[0]
.mapValues { value -> value.data }
.branch(
Predicate { key, _ -> key.contains("/record/") },
Predicate { key, _ -> key.contains("/recordSet/") },
......@@ -69,33 +61,75 @@ class KafkaTopology(
Predicate { _, _ -> true }
)
outputData[0]
processOutcome[0]
.filter { _, value -> value.status != ProcessStatus.FATAL }
.mapValues { value -> value.data }
.to(settings.outputTopic + "-records-" + settings.appSettings.getProperty("topicOutPostfix"))
outputData[0]
processOutcome[0]
.filter { _, value -> value.status != ProcessStatus.FATAL }
.mapValues { key, _ -> writeTransaction(key) }
.to(settings.appSettings.getProperty("topicTransactionsRecords"))
outputData[1]
processOutcome[0]
.mapValues { key, value -> writeReport(key, value, Service.recordsStep) }
.to(settings.processReportTopic)
processOutcome[1]
.filter { _, value -> value.status != ProcessStatus.FATAL }
.mapValues { value -> value.data }
.to(settings.outputTopic + "-record-sets-" + settings.appSettings.getProperty("topicOutPostfix"))
outputData[1]
processOutcome[1]
.mapValues { key, _ -> writeTransaction(key) }
.to(settings.appSettings.getProperty("topicTransactionsRecordSets"))
outputData[2]
processOutcome[1]
.mapValues { key, value -> writeReport(key, value, Service.groupsStep) }
.to(settings.processReportTopic)
processOutcome[2]
.filter { _, value -> value.status != ProcessStatus.FATAL }
.mapValues { value -> value.data }
.to(settings.outputTopic + "-institutions-" + settings.appSettings.getProperty("topicOutPostfix"))
outputData[2]
processOutcome[2]
.mapValues { key, _ -> writeTransaction(key) }
.to(settings.appSettings.getProperty("topicTransactionsInstitutions"))
processOutcome[0]
.mapValues { key, value -> writeReport(key, value) }
processOutcome[2]
.mapValues { key, value -> writeReport(key, value, Service.groupsStep) }
.to(settings.processReportTopic)
processOutcome[1]
.mapValues { key, value -> writeReport(key, value) }
processOutcome[3]
.mapValues { key, value ->
writeReport(
key,
ProcessResult(
value.data,
ProcessStatus.FATAL,
"Unable to match key type to either records, record sets or institutions."
),
Service.groupsStep
)
}
.to(settings.processReportTopic)
processOutcome[3]
.mapValues { key, value ->
writeReport(
key,
ProcessResult(
value.data,
ProcessStatus.FATAL,
"Unable to match key type to either records, record sets or institutions."
),
Service.recordsStep
)
}
.to(settings.processReportTopic)
return builder.build()
......@@ -136,8 +170,8 @@ class KafkaTopology(
return input
}
private fun writeReport(id: String, input: ProcessResult<String>): String {
return Report(id, input.status.toString(), input.message, "import-process-bridge").toJson()
private fun writeReport(id: String, input: ProcessResult<String>, step: String): String {
return Report(id, input.status.toString(), input.message, step).toJson()
}
private fun writeTransaction(id: String): String {
......
......@@ -24,8 +24,12 @@ import org.apache.logging.log4j.LogManager
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("ImportProcessBridgeService")
companion object {
const val recordsStep = "import-process-bridge-records"
const val groupsStep = "import-process-bridge-groups"
}
val settings = SettingsLoader(
private val settings = SettingsLoader(
listOf(
"topicOutPostfix",
"topicTransactionsRecords",
......@@ -36,7 +40,7 @@ class Service(file: String = "app.yml") {
useStreamsConfig = true
)
val topology = KafkaTopology(settings).build()
private val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
fun run() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment