Commit 27726a51 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement process reports

Refactors tests & move parser functionality out of KafkaTopology.kt.
parent 6e1f839c
Pipeline #12668 passed with stages
in 5 minutes and 25 seconds
......@@ -11,7 +11,7 @@ test:
tags:
- mbr
script:
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast --tests "org.memobase.Tests"
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast
.build-image:
......
......@@ -24,7 +24,8 @@ import org.apache.logging.log4j.LogManager
class App {
companion object {
private val log = LogManager.getLogger("TableDataTransformApp")
@JvmStatic fun main(args: Array<String>) {
@JvmStatic
fun main(args: Array<String>) {
try {
Service().run()
} catch (ex: Exception) {
......
......@@ -18,404 +18,89 @@
package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import java.io.File
import java.io.StringReader
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.poi.ss.usermodel.Cell
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.Row
import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.models.ErrorResult
import org.memobase.models.Formats
import org.memobase.models.Message
import org.memobase.models.ParserResult
import org.memobase.models.ProcessReport
import org.memobase.models.Report
import org.memobase.models.ReportMessages
import org.memobase.models.ReportStatus
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class KafkaTopology(private val settings: SettingsLoader) {
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val sheetIndex = settings.appSettings.getProperty("sheet").toInt()
private val headerCount = settings.appSettings.getProperty("header.count").toInt()
private val propertyNamesIndex = settings.appSettings.getProperty("header.line").toInt()
private val identifierIndex = settings.appSettings.getProperty("identifier").toInt()
private val invalidPropertyNameCharacters = listOf('.', ':', '/', '+')
private val parser = TableParser(settings)
private val reportingTopic = settings.outputTopic + "-reporting"
private val klaxon = Klaxon()
fun build(): Topology {
val builder = StreamsBuilder()
val branchedSource = builder
.stream<String, String>(settings.inputTopic)
.flatMapValues { value -> listOfNotNull(klaxon.parse<Message>(value)) }
.branch(
Predicate { _, value -> filter(value) },
Predicate { _, value -> value.format == Formats.error },
Predicate { _, _ -> true }
)
// report filtered error message from previous job.
branchedSource[1]
.mapValues { key, _ ->
Klaxon().toJsonString(
Report(
key,
ReportStatus.failure,
ReportMessages.processFailure(key, "the input file is invalid.")
)
)
}
.to("${settings.outputTopic}-reporting")
// filtered result simply sends ERROR along!
branchedSource[1]
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
// report full process as failure
// TODO: This currently means, that if there are three files, there are three process reports!
branchedSource[1]
.mapValues { key, _ ->
Klaxon().toJsonString(
Report(
"table-data-transform", ReportStatus.failure,
ReportMessages.processFailure(key, "the input file is invalid.")
)
)
}
.to(settings.processReportTopic)
// work branch
val formatBranches = branchedSource[0]
.mapValues { _, value -> parseJsonObject(value) }
.branch(
Predicate { _, value -> listOf(Formats.csv, Formats.tsv).contains(value["format"]) },
Predicate { _, value -> listOf(Formats.xls, Formats.xlsx).contains(value["format"]) },
Predicate { _, value -> listOf(Formats.ods).contains(value["format"]) }
)
// CSV Branch
buildHelper(formatBranches[0]
.mapValues { key, value -> errorWrapperCsv(key, value) })
ignoreFaultyInput(branchedSource[0])
// XSL / XSLX Branch
buildHelper(formatBranches[1]
.mapValues { key, value -> errorWrapperXsl(key, value) })
processValidInput(branchedSource[1]
.mapValues { key, value -> parser.parseTable(key, value) })
return builder.build()
}
private fun buildHelper(stream: KStream<String, Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report>>) {
private fun processValidInput(stream: KStream<String, ParserResult>) {
val records = stream
.flatMapValues { _, value -> value.first }
.flatMapValues { _, value -> value.messages }
records
.map { _, value -> KeyValue(value.first.first, value.first.second) }
.map { _, value -> KeyValue(value.key, value.value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> value.toJsonString() }
.to(settings.outputTopic)
records
.filter { _, value -> value.first.second.isNotEmpty() }
.map { _, value -> KeyValue(value.second.id, value.second) }
.mapValues { value -> Klaxon().toJsonString(value) }
.map { _, value -> KeyValue(value.report.id, value.report) }
.mapValues { value -> value.toJson() }
.to(reportingTopic)
stream
.map { _, value -> reportToJson(value.second) }
.map { _, value -> KeyValue(value.processReport.id, value.processReport.toJson()) }
.to(settings.processReportTopic)
}
private fun filter(value: String): Boolean {
return !value.contains(Formats.error)
}
private fun parseJsonObject(value: String): JsonObject {
return Klaxon().parseJsonObject(StringReader(value))
}
private fun errorWrapperCsv(
key: String,
value: JsonObject
): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = csvMapper(key, value)
Pair(result, Report("table-data-transform", ReportStatus.success, ReportMessages.processSuccess(result.size)))
} catch (ex: InvalidInputException) {
Pair(
listOf(
Pair(
Pair(key, ErrorResult.result),
Report(key, ReportStatus.failure, ReportMessages.invalidFile(key, ex.localizedMessage))
)
), Report("table-data-transform", ReportStatus.failure, ReportMessages.processFailure(key, ex.localizedMessage))
)
}
}
private fun csvMapper(key: String, value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
val resultMessages = mutableListOf<Pair<Pair<String, JsonObject>, Report>>()
val mutableSet = mutableSetOf<String>()
sftpClient.open(File(value["path"] as String)).use { inputStream ->
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value["format"] == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(inputStream)
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
val trimmedProperty = property.trim()
if (trimmedProperty.isEmpty()) {
throw InvalidInputException(
key,
"Missing a property name on row $count in column ${index + 1}!"
)
}
if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(
key,
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
)
}
}
}
continue
}
// the -1 ensures, that users can start columns beginning at 1!
val identifier: String = try {
line[identifierIndex - 1].let { value ->
when (value) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
)
}
in mutableSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
mutableSet.add(value)
value
}
}
}
} catch (ex: InvalidInputException) {
resultMessages.add(
Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, ReportStatus.failure, ReportMessages.reportFailure(ex.localizedMessage))
)
)
continue
}
val keyValueMap = json {
obj(
zip(headerProperties, line)
)
}
val result = Pair(identifier, keyValueMap)
val report = Report(
identifier,
ReportStatus.success,
ReportMessages.reportSuccess(identifier, count)
)
resultMessages.add(Pair(result, report))
}
}
return resultMessages
}
private fun zip(header: List<String>, line: List<String>): List<Pair<String, String>> {
val result = mutableListOf<Pair<String, String>>()
header.forEachIndexed { index, s ->
if (line[index].isNotEmpty()) {
result.add(Pair(s, line[index].trim()))
}
}
return result
}
private fun errorWrapperXsl(
key: String,
value: JsonObject
): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = excelMapper(key, value)
Pair(result, Report("table-data-transform", ReportStatus.success, ReportMessages.processSuccess(result.size)))
} catch (ex: InvalidInputException) {
Pair(
listOf(
Pair(
Pair(key, ErrorResult.result),
Report(key, ReportStatus.failure, ReportMessages.invalidFile(key, ex.localizedMessage))
)
), Report("table-data-transform", ReportStatus.failure, ReportMessages.processFailure(key, ex.localizedMessage))
)
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
Pair(
listOf(
Pair(
Pair(key, ErrorResult.result),
Report(key, ReportStatus.failure, ReportMessages.invalidFile(key, ex.localizedMessage))
)
), Report("table-data-transform", ReportStatus.failure, ReportMessages.processFailure(key, ex.localizedMessage))
)
}
}
private fun excelMapper(key: String, value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
return sftpClient.open(File(value["path"] as String)).use { inputStream ->
WorkbookFactory.create(inputStream).use { workbook ->
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
// sheet index is 0-based. This ensures that users can access sheet 1 with index 1!
val sheet = workbook.getSheetAt(sheetIndex - 1)
var count = 0
sheet.filter { row ->
row.any { cell ->
// filter all rows with no string, boolean or numeric cell
when (cell.cellType) {
CellType._NONE -> false
CellType.NUMERIC -> true
CellType.STRING -> true
CellType.FORMULA -> false
CellType.BLANK -> false
CellType.BOOLEAN -> true
CellType.ERROR -> false
else -> false
}
}
}.map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
propertiesList.addAll(row.map { cell ->
if (retriveCellValue(cell).isNotEmpty()) {
if (retriveCellValue(cell).any { char ->
invalidPropertyNameCharacters.contains(
char
)
}) {
throw InvalidInputException(
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
)
} else {
retriveCellValue(cell)
}
} else {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
)
}
}.map { it.trim() })
}
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex - 1).let { cell ->
if (cell != null) {
when (val cellValue = retriveCellValue(cell)) {
"" -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
)
}
in identifierSet -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
identifierSet.add(cellValue)
cellValue
}
}
} else {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
)
}
}
} catch (ex: InvalidInputException) {
return@map Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, ReportStatus.failure, ReportMessages.reportFailure(ex.localizedMessage))
)
}
private fun ignoreFaultyInput(stream: KStream<String, Message>) {
stream
.mapValues { _ -> ErrorResult.get().toJsonString() }
.to(settings.outputTopic)
val jsonObject = json {
obj(
zip(
propertiesList,
retrieveCells(row, propertiesList.size - 1)
))
}
Pair(
Pair(rowIdentifier, jsonObject),
stream
.mapValues { key, _ ->
Report(
rowIdentifier,
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, count)
)
key,
ReportStatus.failure,
ReportMessages.processFailure(key, "of an invalid input message.")
)
}
}.filterNotNull()
}
}
}
private fun odsMapper(value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
return emptyList()
}
private fun reportToJson(value: Report): KeyValue<String, String> {
return KeyValue(value.id, Klaxon().toJsonString(value))
}
private fun retrieveCells(row: Row, size: Int): List<String> {
return (0..size).map { i ->
val cell = row.getCell(i)
retriveCellValue(cell)
}
}
.mapValues { value -> value.toJson() }
.to("${settings.outputTopic}-reporting")
private fun retriveCellValue(cell: Cell?): String {
return if (cell != null) {
when (cell.cellType) {
CellType.BLANK -> ""
CellType.BOOLEAN -> cell.booleanCellValue.toString()
CellType._NONE -> ""
CellType.NUMERIC ->
if (cell.numericCellValue >= 1) {
cell.numericCellValue.toLong().toString()
} else {
cell.localDateTimeCellValue.toLocalTime().toString()
}
CellType.STRING -> cell.stringCellValue
CellType.FORMULA -> ""
CellType.ERROR -> ""
else -> ""
stream
.mapValues { _ ->
ProcessReport(ReportStatus.failure, 1)
}
} else ""
.mapValues { value -> value.toJson() }
.to(settings.processReportTopic)
}
}
/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import java.io.File
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.UtilityFunctions.retrieveCellValue
import org.memobase.UtilityFunctions.retrieveCells
import org.memobase.UtilityFunctions.zip
import org.memobase.models.ErrorResult
import org.memobase.models.Formats
import org.memobase.models.Message
import org.memobase.models.ParserResult
import org.memobase.models.ProcessReport
import org.memobase.models.Report
import org.memobase.models.ReportMessages
import org.memobase.models.ReportStatus
import org.memobase.models.ResultMessage
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class TableParser(settings: SettingsLoader) {
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val sheetIndex = settings.appSettings.getProperty("sheet").toInt()
private val headerCount = settings.appSettings.getProperty("header.count").toInt()
private val propertyNamesIndex = settings.appSettings.getProperty("header.line").toInt()
private val identifierIndex = settings.appSettings.getProperty("identifier").toInt()
private val invalidPropertyNameCharacters = listOf('.', ':', '/', '+')
fun parseTable(key: String, inputMessage: Message): ParserResult {
return try {
val result: List<ResultMessage> = when (inputMessage.format) {
Formats.xls, Formats.xlsx -> excelMapper(key, inputMessage)
Formats.csv, Formats.tsv -> csvMapper(key, inputMessage)
else -> throw InvalidInputException(key, "Cannot parse the table with format ${inputMessage.format}.")
}
val failureCount = result.count { value -> value.report.status == ReportStatus.failure }
ParserResult(
result,
ProcessReport(
"table-data-transform",
ReportStatus.success,
result.size,
result.size - failureCount,
failureCount
)
)
} catch (ex: InvalidInputException) {
ParserResult(
ex.key, ErrorResult.get(), Report(ex.key, ReportStatus.failure, ex.message ?: "Unknown issue."),
ProcessReport(ReportStatus.failure, 1)
)
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
ParserResult(
key, ErrorResult.get(), Report(key, ReportStatus.failure, ex.localizedMessage),
ProcessReport(ReportStatus.failure, 1)
)
}
}
private fun csvMapper(key: String, value: Message): List<ResultMessage> {
val resultMessages = mutableListOf<ResultMessage>()
val mutableSet = mutableSetOf<String>()
sftpClient.open(File(value.path)).use { inputStream ->
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value.format == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(inputStream)
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
val trimmedProperty = property.trim()
if (trimmedProperty.isEmpty()) {
throw InvalidInputException(
key,
"Missing a property name on row $count in column ${index + 1}!"
)
}
if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(
key,
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
)
}
}
}
continue
}
// the -1 ensures, that users can start columns beginning at 1!
val identifier: String = try {
line[identifierIndex - 1].let { value ->
when (value) {