Commit 3832c509 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Style update

Remove ods format and unused formats
parent ea619f11
...@@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager ...@@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager
class App { class App {
companion object { companion object {
private val log = LogManager.getLogger("TableDataTransformApp") private val log = LogManager.getLogger("TableDataTransformApp")
@JvmStatic @JvmStatic
fun main(args: Array<String>) { fun main(args: Array<String>) {
try { try {
......
...@@ -35,37 +35,37 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -35,37 +35,37 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val parser = TableParser(step, settings) private val parser = TableParser(step, settings)
private val reportingTopic = settings.processReportTopic private val reportingTopic = settings.processReportTopic
private val klaxon = Klaxon() private val klaxon = Klaxon()
private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls, Formats.ods) private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls)
fun build(): Topology { fun build(): Topology {
val builder = StreamsBuilder() val builder = StreamsBuilder()
val parseMessageStream = builder val parseMessageStream = builder
.stream<String, String>(settings.inputTopic) .stream<String, String>(settings.inputTopic)
.mapValues { readOnlyKey, value -> parseMessage(readOnlyKey, value) } .mapValues { readOnlyKey, value -> parseMessage(readOnlyKey, value) }
.branch( .branch(
Predicate { _, value -> value.second.status == ReportStatus.fatal }, Predicate { _, value -> value.second.status == ReportStatus.fatal },
Predicate { _, value -> value.second.status != ReportStatus.fatal } Predicate { _, value -> value.second.status != ReportStatus.fatal }
) )
parseMessageStream[0] parseMessageStream[0]
.mapValues { value -> value.second.toJson() } .mapValues { value -> value.second.toJson() }
.to(reportingTopic) .to(reportingTopic)
val parsedTables = parseMessageStream[1] val parsedTables = parseMessageStream[1]
.filter { _, value -> acceptedFormats.contains(value.first.format) } .filter { _, value -> acceptedFormats.contains(value.first.format) }
.transformValues(HeaderExtractionTransformSupplier<Pair<Message, Report>>()) .transformValues(HeaderExtractionTransformSupplier<Pair<Message, Report>>())
.flatMapValues { key, value -> parser.parseTable(key, value.first.first, value.second) } .flatMapValues { key, value -> parser.parseTable(key, value.first.first, value.second) }
.map { _, value -> KeyValue(value.key, value) } .map { _, value -> KeyValue(value.key, value) }
parsedTables parsedTables
.mapValues { value -> value.report.toJson() } .mapValues { value -> value.report.toJson() }
.to(reportingTopic) .to(reportingTopic)
parsedTables parsedTables
.filter { _, value -> value.report.status != ReportStatus.fatal } .filter { _, value -> value.report.status != ReportStatus.fatal }
.mapValues { value -> klaxon.toJsonString(value.value) } .mapValues { value -> klaxon.toJsonString(value.value) }
.to(settings.outputTopic) .to(settings.outputTopic)
return builder.build() return builder.build()
} }
...@@ -75,27 +75,33 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -75,27 +75,33 @@ class KafkaTopology(private val settings: SettingsLoader) {
return try { return try {
val parsedMessage = klaxon.parse<Message>(data) val parsedMessage = klaxon.parse<Message>(data)
if (parsedMessage == null) { if (parsedMessage == null) {
Pair(Message("", ""), Report( Pair(
Message("", ""), Report(
readOnlyKey, readOnlyKey,
ReportStatus.fatal, ReportStatus.fatal,
"Could not parse message from data $data.", "Could not parse message from data $data.",
step step
)) )
)
} else { } else {
Pair(parsedMessage, Report( Pair(
parsedMessage, Report(
readOnlyKey, readOnlyKey,
ReportStatus.success, ReportStatus.success,
"", "",
step step
)) )
)
} }
} catch (ex: KlaxonException) { } catch (ex: KlaxonException) {
Pair(Message("", ""), Report( Pair(
Message("", ""), Report(
readOnlyKey, readOnlyKey,
ReportStatus.fatal, ReportStatus.fatal,
"JSON Parser Error: ${ex.localizedMessage}.", "JSON Parser Error: ${ex.localizedMessage}.",
step step
)) )
)
} }
} }
} }
...@@ -29,10 +29,10 @@ class Service(file: String = "app.yml") { ...@@ -29,10 +29,10 @@ class Service(file: String = "app.yml") {
} }
val settings = SettingsLoader( val settings = SettingsLoader(
listOf(reportingStepNamePropName), listOf(reportingStepNamePropName),
file, file,
useStreamsConfig = true, useStreamsConfig = true,
readSftpSettings = true readSftpSettings = true
) )
val topology = KafkaTopology(settings).build() val topology = KafkaTopology(settings).build()
......
...@@ -22,7 +22,4 @@ object Formats { ...@@ -22,7 +22,4 @@ object Formats {
const val tsv = "TSV" const val tsv = "TSV"
const val xlsx = "XLSX" const val xlsx = "XLSX"
const val xls = "XLS" const val xls = "XLS"
const val ods = "ODS"
const val invalid = "INVALID"
const val error = "ERROR"
} }
...@@ -18,6 +18,6 @@ ...@@ -18,6 +18,6 @@
package org.memobase.models package org.memobase.models
data class Message( data class Message(
val format: String, val format: String,
val path: String val path: String
) )
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment