Commit 0e0784e7 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Change csv mapper identifier error behaviour

Will now continue working on the rows and
simply report the invalid identifiers per row.
parent 85d8cafd
......@@ -164,17 +164,33 @@ class KafkaTopology(private val settings: SettingsLoader) {
continue
}
// the -1 ensures, that users can start columns beginning at 1!
val identifier = line[identifierIndex - 1]
when {
identifier.isEmpty() -> {
throw InvalidInputException("$count.$identifierIndex", "The unique identifier in column $identifierIndex in row $count is empty!")
}
mutableSet.contains(identifier) -> {
throw InvalidInputException("$count.$identifierIndex", "The unique identifier in column $identifierIndex in row $count is a duplicate of another row!")
}
else -> {
mutableSet.add(identifier)
}
val identifier: String = try {
line[identifierIndex - 1].let { value ->
when (value) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
)
}
in mutableSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
mutableSet.add(value)
value
}
}
}
} catch (ex: InvalidInputException) {
resultMessages.add(Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, "FAILURE", ex.localizedMessage)
))
continue
}
val keyValueMap = json {
obj(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment