Commit a97ab6f7 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add fatal error if file does not exists.

parent b273e3c9
Pipeline #33413 passed with stages
in 3 minutes and 51 seconds
......@@ -17,6 +17,7 @@
*/
package org.memobase
import ch.memobase.exceptions.SftpClientException
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
......@@ -56,25 +57,50 @@ class KafkaTopology(private val settings: SettingsLoader) {
.mapValues { value -> value.second.toJson() }
.to(reportingTopic)
val parsedTables = parseMessageStream[1]
val openFile = parseMessageStream[1]
.filter { _, value -> acceptedFormats.contains(value.first.format) }
.mapValues { value -> value.first }
.transformValues(HeaderExtractionTransformSupplier<Message>())
.mapValues { _, value ->
Triple(value.first, value.second, reader.open(value.first.path))
val inputStream = try {
reader.open(value.first.path)
} catch (ex: SftpClientException) {
log.error("Could not read file at ${value.first.path}.")
null
}
Triple(value.first, value.second, inputStream)
}
.flatMapValues { key, value -> parser.parseTable(key, value.first, value.second, value.third) }
.branch(
Predicate { _, value -> value.third == null },
Predicate { _, _ -> true }
)
openFile[0]
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
"Could not read file at ${value.first.path}.",
step
).toJson()
}
.to(reportingTopic)
val parsedTable = openFile[1]
.flatMapValues { key, value -> parser.parseTable(key, value.first, value.second, value.third!!) }
.map { _, value -> KeyValue(value.key, value) }
parsedTables
parsedTable
.mapValues { value -> value.report.toJson() }
.to(reportingTopic)
parsedTables
parsedTable
.filter { _, value -> value.report.status != ReportStatus.fatal }
.mapValues { value ->
log.info("Successfully processed message ${value.key}.")
klaxon.toJsonString(value.value) }
klaxon.toJsonString(value.value)
}
.to(settings.outputTopic)
return builder.build()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment