Commit fc138189 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Adds excel parser

parent c74d76bf
......@@ -52,6 +52,7 @@ dependencies {
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// XSLX / XSL Reader
implementation 'org.apache.poi:poi:4.1.2'
implementation 'org.apache.poi:poi-ooxml:4.1.2'
// ODS Reader
implementation 'org.odftoolkit:odftoolkit:1.0.0-BETA1'
......
#Tue Apr 07 16:43:20 CEST 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip
#Tue May 19 16:49:13 CEST 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=GRADLE_USER_HOME
zipStorePath=wrapper/dists
......
......@@ -18,4 +18,4 @@
package org.memobase
class InvalidInputException(message: String) : Exception(message)
class InvalidInputException(val key: String, message: String) : Exception(message)
......@@ -24,12 +24,14 @@ import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import java.io.File
import java.io.StringReader
import kotlin.IllegalArgumentException
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
......@@ -42,6 +44,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val propertyNamesIndex = settings.appSettings.getProperty("header.line").toInt()
private val identifierIndex = settings.appSettings.getProperty("identifier").toInt()
private val invalidPropertyNameCharacters = listOf('.', ':', '/', '+')
private val reportingTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
......@@ -83,7 +87,10 @@ class KafkaTopology(private val settings: SettingsLoader) {
// CSV Branch
buildHelper(formatBranches[0]
.mapValues { key, value -> errorWrapper(key, value) })
.mapValues { key, value -> errorWrapperCsv(key, value) })
// XSL / XSLX Branch
buildHelper(formatBranches[1].mapValues { key, value -> errorWrapperXsl(key, value) })
return builder.build()
}
......@@ -115,20 +122,22 @@ class KafkaTopology(private val settings: SettingsLoader) {
return Klaxon().parseJsonObject(StringReader(value))
}
private fun errorWrapper(key: String, value: JsonObject): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
private fun errorWrapperCsv(
key: String,
value: JsonObject
): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = csvMapper(value)
val result = csvMapper(key, value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
} catch (ex: InvalidInputException) {
Pair(emptyList(), Report(key, "FAILED", ex.localizedMessage))
Pair(emptyList(), Report(key, "FAILURE", ex.localizedMessage))
}
}
private fun csvMapper(value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
private fun csvMapper(key: String, value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
val resultMessages = mutableListOf<Pair<Pair<String, JsonObject>, Report>>()
val mutableSet = mutableSetOf<String>()
sftpClient.open(File(value["path"] as String)).use { remoteFile ->
// What about TSV? is that automatic or not? probably not ...
val reader =
csvReader {
this.quoteChar = '"'
......@@ -142,14 +151,13 @@ class KafkaTopology(private val settings: SettingsLoader) {
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
// TODO: check if there is a property name for each column!
headerProperties = line
headerProperties.forEachIndexed { index, property ->
if (property.isEmpty()) {
throw InvalidInputException("Missing a property name on row $count in column ${index + 1}!")
throw InvalidInputException(key, "Missing a property name on row $count in column ${index + 1}!")
}
if (property.contains(Regex("[+,.]"))) {
throw InvalidInputException("Invalid property name $property on row $count in column ${index + 1}! You may not use the any of the following characters: + , . ")
if (property.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(key, "Invalid property name $property on row $count in column ${index + 1}! You may not use the any of the following characters: + , . ")
}
}
}
......@@ -159,10 +167,10 @@ class KafkaTopology(private val settings: SettingsLoader) {
val identifier = line[identifierIndex - 1]
when {
identifier.isEmpty() -> {
throw InvalidInputException("The unique identifier in column $identifierIndex in row $count is empty!")
throw InvalidInputException("$count.$identifierIndex", "The unique identifier in column $identifierIndex in row $count is empty!")
}
mutableSet.contains(identifier) -> {
throw InvalidInputException("The unique identifier in column $identifierIndex in row $count is a duplicate of another row!")
throw InvalidInputException("$count.$identifierIndex", "The unique identifier in column $identifierIndex in row $count is a duplicate of another row!")
}
else -> {
mutableSet.add(identifier)
......@@ -195,8 +203,110 @@ class KafkaTopology(private val settings: SettingsLoader) {
return result
}
private fun excelMapper(value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
return emptyList()
private fun errorWrapperXsl(
key: String,
value: JsonObject
): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = excelMapper(key, value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
} catch (ex: InvalidInputException) {
Pair(emptyList(), Report(key, "FAILURE", ex.localizedMessage))
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
Pair(emptyList(), Report(key, "FAILURE", ex.localizedMessage))
}
}
private fun excelMapper(key: String, value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
return sftpClient.open(File(value["path"] as String)).use { remoteFile ->
remoteFile.RemoteFileInputStream().use { stream ->
WorkbookFactory.create(stream).use { workbook ->
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
val sheet = workbook.getSheetAt(sheetIndex)
var count = 0
sheet.map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
propertiesList.addAll(row.map { cell ->
if (cell.stringCellValue.isNotEmpty()) {
if (cell.stringCellValue.any { char ->
invalidPropertyNameCharacters.contains(
char
)
}) {
throw InvalidInputException(
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
)
} else {
cell.stringCellValue
}
} else {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
)
}
})
}
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex).let { cell ->
if (cell != null) {
when (val cellValue = cell.stringCellValue) {
"" -> {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
)
}
in identifierSet -> {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
identifierSet.add(cellValue)
cellValue
}
}
} else {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
)
}
}
} catch (ex: InvalidInputException) {
return@map Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, "FAILURE", ex.localizedMessage)
)
}
val jsonObject = json {
obj(
zip(
propertiesList,
row.map { cell -> if (cell != null) cell.stringCellValue else "" })
)
}
Pair(
Pair(rowIdentifier, jsonObject),
Report(
rowIdentifier,
"SUCCESS",
"Successfully created a key-value map from row ${row.rowNum}."
)
)
}
}.filterNotNull()
}
}
}
}
private fun odsMapper(value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment