Commit 5e5ef957 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fixes process reports

parent 73f9783c
......@@ -76,12 +76,16 @@ class KafkaTopology(private val settings: SettingsLoader) {
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
branchedSource[1]
.mapValues { key, _ -> Klaxon().toJsonString(Report(key, "FAILURE", "No valid file found!")) }
.to(settings.processReportTopic)
// work branch
val formatBranches = branchedSource[0]
.mapValues { _, value -> parseJsonObject(value) }
.branch(
Predicate { _, value -> listOf("CSV", "TSV").contains(value["format"]) },
Predicate { _, value -> listOf("XSL", "XSLX").contains(value["format"]) },
Predicate { _, value -> listOf("XLS", "XLSX").contains(value["format"]) },
Predicate { _, value -> listOf("ODS").contains(value["format"]) }
)
......@@ -90,7 +94,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
.mapValues { key, value -> errorWrapperCsv(key, value) })
// XSL / XSLX Branch
buildHelper(formatBranches[1].mapValues { key, value -> errorWrapperXsl(key, value) })
buildHelper(formatBranches[1]
.mapValues { key, value -> errorWrapperXsl(key, value) })
return builder.build()
}
......@@ -130,7 +135,13 @@ class KafkaTopology(private val settings: SettingsLoader) {
val result = csvMapper(key, value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
} catch (ex: InvalidInputException) {
Pair(emptyList(), Report(key, "FAILURE", ex.localizedMessage))
Pair(
listOf(
Pair(
Pair(key, JsonObject(mapOf(Pair("message", "ERROR")))),
Report(key, "FAILURE", ex.localizedMessage)
)), Report(key, "FAILURE", ex.localizedMessage)
)
}
}
......@@ -154,10 +165,16 @@ class KafkaTopology(private val settings: SettingsLoader) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
if (property.isEmpty()) {
throw InvalidInputException(key, "Missing a property name on row $count in column ${index + 1}!")
throw InvalidInputException(
key,
"Missing a property name on row $count in column ${index + 1}!"
)
}
if (property.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(key, "Invalid property name $property on row $count in column ${index + 1}! You may not use the any of the following characters: + , . ")
throw InvalidInputException(
key,
"Invalid property name $property on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
)
}
}
}
......@@ -166,30 +183,32 @@ class KafkaTopology(private val settings: SettingsLoader) {
// the -1 ensures, that users can start columns beginning at 1!
val identifier: String = try {
line[identifierIndex - 1].let { value ->
when (value) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
)
}
in mutableSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
mutableSet.add(value)
value
}
when (value) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
)
}
in mutableSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
mutableSet.add(value)
value
}
}
} catch (ex: InvalidInputException) {
resultMessages.add(Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, "FAILURE", ex.localizedMessage)
))
}
} catch (ex: InvalidInputException) {
resultMessages.add(
Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, "FAILURE", ex.localizedMessage)
)
)
continue
}
val keyValueMap = json {
......@@ -227,9 +246,24 @@ class KafkaTopology(private val settings: SettingsLoader) {
val result = excelMapper(key, value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
} catch (ex: InvalidInputException) {
Pair(emptyList(), Report(key, "FAILURE", ex.localizedMessage))
Pair(
listOf(
Pair(
Pair(key, JsonObject(mapOf(Pair("message", "ERROR")))),
Report(key, "FAILURE", ex.localizedMessage)
)
), Report(key, "FAILURE", ex.localizedMessage)
)
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
Pair(emptyList(), Report(key, "FAILURE", ex.localizedMessage))
Pair(
listOf(
Pair(
Pair(key, JsonObject(mapOf(Pair("message", "ERROR")))),
Report(key, "FAILURE", ex.localizedMessage)
)
),
Report(key, "FAILURE", ex.localizedMessage)
)
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment