Commit 6b9cb0f3 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor constant values

parent 5e5ef957
......@@ -64,8 +64,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
Klaxon().toJsonString(
Report(
key,
"FAILURE",
"Ignored message due to previous error."
ReportStatus.failure,
ReportMessages.processFailure(key, "the input file is invalid.")
)
)
}
......@@ -76,17 +76,26 @@ class KafkaTopology(private val settings: SettingsLoader) {
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
// report full process as failure
// TODO: This currently means, that if there are three files, there are three process reports!
branchedSource[1]
.mapValues { key, _ -> Klaxon().toJsonString(Report(key, "FAILURE", "No valid file found!")) }
.mapValues { key, _ ->
Klaxon().toJsonString(
Report(
key, ReportStatus.failure,
ReportMessages.processFailure(key, "the input file is invalid.")
)
)
}
.to(settings.processReportTopic)
// work branch
val formatBranches = branchedSource[0]
.mapValues { _, value -> parseJsonObject(value) }
.branch(
Predicate { _, value -> listOf("CSV", "TSV").contains(value["format"]) },
Predicate { _, value -> listOf("XLS", "XLSX").contains(value["format"]) },
Predicate { _, value -> listOf("ODS").contains(value["format"]) }
Predicate { _, value -> listOf(Formats.csv, Formats.tsv).contains(value["format"]) },
Predicate { _, value -> listOf(Formats.xls, Formats.xlsx).contains(value["format"]) },
Predicate { _, value -> listOf(Formats.ods).contains(value["format"]) }
)
// CSV Branch
......@@ -120,7 +129,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
private fun filter(value: String): Boolean {
return !value.contains("ERROR")
return !value.contains(Formats.error)
}
private fun parseJsonObject(value: String): JsonObject {
......@@ -133,15 +142,16 @@ class KafkaTopology(private val settings: SettingsLoader) {
): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = csvMapper(key, value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
Pair(result, Report(key, ReportStatus.success, ReportMessages.processSuccess(result.size)))
} catch (ex: InvalidInputException) {
Pair(
listOf(
Pair(
Pair(key, JsonObject(mapOf(Pair("message", "ERROR")))),
Report(key, "FAILURE", ex.localizedMessage)
)), Report(key, "FAILURE", ex.localizedMessage)
)
Pair(key, ErrorResult.result),
Report(key, ReportStatus.failure, ReportMessages.invalidFile(key, ex.localizedMessage))
)
), Report(key, ReportStatus.failure, ReportMessages.processFailure(key, ex.localizedMessage))
)
}
}
......@@ -152,7 +162,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value["format"] == "CSV") ',' else '\t'
this.delimiter = if (value["format"] == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(remoteFile.RemoteFileInputStream())
......@@ -206,7 +216,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
resultMessages.add(
Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, "FAILURE", ex.localizedMessage)
Report(ex.key, ReportStatus.failure, ReportMessages.reportFailure(ex.localizedMessage))
)
)
continue
......@@ -219,8 +229,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
val result = Pair(identifier, keyValueMap)
val report = Report(
identifier,
"SUCCESS",
"Successfully created record with identifier $identifier from row $count!"
ReportStatus.success,
ReportMessages.reportSuccess(identifier, count)
)
resultMessages.add(Pair(result, report))
}
......@@ -244,25 +254,24 @@ class KafkaTopology(private val settings: SettingsLoader) {
): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = excelMapper(key, value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
Pair(result, Report(key, ReportStatus.success, ReportMessages.processSuccess(result.size)))
} catch (ex: InvalidInputException) {
Pair(
listOf(
Pair(
Pair(key, JsonObject(mapOf(Pair("message", "ERROR")))),
Report(key, "FAILURE", ex.localizedMessage)
Pair(key, ErrorResult.result),
Report(key, ReportStatus.failure, ReportMessages.invalidFile(key, ex.localizedMessage))
)
), Report(key, "FAILURE", ex.localizedMessage)
), Report(key, ReportStatus.failure, ReportMessages.processFailure(key, ex.localizedMessage))
)
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
Pair(
listOf(
Pair(
Pair(key, JsonObject(mapOf(Pair("message", "ERROR")))),
Report(key, "FAILURE", ex.localizedMessage)
Pair(key, ErrorResult.result),
Report(key, ReportStatus.failure, ReportMessages.invalidFile(key, ex.localizedMessage))
)
),
Report(key, "FAILURE", ex.localizedMessage)
), Report(key, ReportStatus.failure, ReportMessages.processFailure(key, ex.localizedMessage))
)
}
}
......@@ -334,7 +343,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
} catch (ex: InvalidInputException) {
return@map Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, "FAILURE", ex.localizedMessage)
Report(ex.key, ReportStatus.failure, ReportMessages.reportFailure(ex.localizedMessage))
)
}
val jsonObject = json {
......@@ -348,8 +357,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
Pair(rowIdentifier, jsonObject),
Report(
rowIdentifier,
"SUCCESS",
"Successfully created a key-value map from row ${row.rowNum}."
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, row.rowNum)
)
)
}
......
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.json
object Formats {
const val csv = "CSV"
const val tsv = "TSV"
const val xlsx = "XLSX"
const val xls = "XLS"
const val ods = "ODS"
const val invalid = "INVALID"
const val error = "ERROR"
}
object Extensions {
const val csv = "csv"
const val tsv = "tsv"
const val xlsx = "xlsx"
const val xls = "xls"
const val ods = "ods"
}
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
object ErrorResult {
val result = json {
obj(Pair("message", Formats.error))
}
}
object ReportMessages {
fun processFailure(fileName: String, message: String): String {
return "Could not process file $fileName, because $message"
}
fun processSuccess(count: Int): String {
return "Transformed table data into $count records."
}
fun invalidFile(fileName: String, message: String): String {
return "Invalid Input Error: $message for file $fileName."
}
fun reportSuccess(identifier: String, count: Int): String {
return "Successfully transformed row $count into key-value map with identifier $identifier."
}
fun reportFailure(message: String): String {
return "Invalid Input Error: $message"
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment