Commit 6b88c940 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update dependency

parent e08e21ff
Pipeline #9871 passed with stages
in 5 minutes and 41 seconds
FROM gradle:6.3-jdk8
FROM gradle:6.3-jdk8
ADD . /
WORKDIR /
RUN gradle --no-daemon --no-scan --no-build-cache distTar
......
......@@ -44,10 +44,7 @@ dependencies {
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.2.1'
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
implementation 'org.memobase:memobase-service-utilities:1.4.0'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// XSLX / XSL Reader
......
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-app-config"
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
namespace: memobase
data:
APPLICATION_ID: "{{ .Values.processId }}-{{ .Values.jobName }}"
......
......@@ -19,6 +19,6 @@ spec:
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-app-config"
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
restartPolicy: Never
backoffLimit: 0
......@@ -24,21 +24,17 @@ import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import java.io.File
import java.io.StringReader
import kotlin.IllegalArgumentException
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("KafkaTopology")
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val sheetIndex = settings.appSettings.getProperty("sheet").toInt()
private val headerCount = settings.appSettings.getProperty("header.count").toInt()
......@@ -159,14 +155,14 @@ class KafkaTopology(private val settings: SettingsLoader) {
private fun csvMapper(key: String, value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
val resultMessages = mutableListOf<Pair<Pair<String, JsonObject>, Report>>()
val mutableSet = mutableSetOf<String>()
sftpClient.open(File(value["path"] as String)).use { remoteFile ->
sftpClient.open(File(value["path"] as String)).use { inputStream ->
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value["format"] == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(remoteFile.RemoteFileInputStream())
}.readAll(inputStream)
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
......@@ -278,120 +274,119 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
private fun excelMapper(key: String, value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
return sftpClient.open(File(value["path"] as String)).use { remoteFile ->
remoteFile.RemoteFileInputStream().use { stream ->
WorkbookFactory.create(stream).use { workbook ->
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
// sheet index is 0-based. This ensures that users can access sheet 1 with index 1!
val sheet = workbook.getSheetAt(sheetIndex - 1)
var count = 0
sheet.filter { row ->
row.any { cell ->
// filter all rows with no string, boolean or numeric cell
when (cell.cellType) {
CellType._NONE -> false
CellType.NUMERIC -> true
CellType.STRING -> true
CellType.FORMULA -> false
CellType.BLANK -> false
CellType.BOOLEAN -> true
CellType.ERROR -> false
else -> false
}
return sftpClient.open(File(value["path"] as String)).use { inputStream ->
WorkbookFactory.create(inputStream).use { workbook ->
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
// sheet index is 0-based. This ensures that users can access sheet 1 with index 1!
val sheet = workbook.getSheetAt(sheetIndex - 1)
var count = 0
sheet.filter { row ->
row.any { cell ->
// filter all rows with no string, boolean or numeric cell
when (cell.cellType) {
CellType._NONE -> false
CellType.NUMERIC -> true
CellType.STRING -> true
CellType.FORMULA -> false
CellType.BLANK -> false
CellType.BOOLEAN -> true
CellType.ERROR -> false
else -> false
}
}.map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
propertiesList.addAll(row.map { cell ->
if (cell.stringCellValue.isNotEmpty()) {
if (cell.stringCellValue.any { char ->
invalidPropertyNameCharacters.contains(
char
)
}) {
throw InvalidInputException(
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
}
}.map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
propertiesList.addAll(row.map { cell ->
if (cell.stringCellValue.isNotEmpty()) {
if (cell.stringCellValue.any { char ->
invalidPropertyNameCharacters.contains(
char
)
} else {
cell.stringCellValue
}
} else {
}) {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
)
} else {
cell.stringCellValue
}
})
}
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex - 1).let { cell ->
if (cell != null) {
when (val cellValue = cell.stringCellValue) {
"" -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
)
}
in identifierSet -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
identifierSet.add(cellValue)
cellValue
}
} else {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
)
}
})
}
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex - 1).let { cell ->
if (cell != null) {
when (val cellValue = cell.stringCellValue) {
"" -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
)
}
in identifierSet -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
identifierSet.add(cellValue)
cellValue
}
} else {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
)
}
} else {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
)
}
} catch (ex: InvalidInputException) {
return@map Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, ReportStatus.failure, ReportMessages.reportFailure(ex.localizedMessage))
)
}
val jsonObject = json {
obj(
zip(
propertiesList,
row.map { cell ->
if (cell != null) {
when (cell.cellType) {
CellType.BLANK -> ""
CellType.BOOLEAN -> cell.booleanCellValue.toString()
CellType._NONE -> ""
CellType.NUMERIC -> cell.numericCellValue.toString()
CellType.STRING -> cell.stringCellValue
CellType.FORMULA -> ""
CellType.ERROR -> ""
else -> ""
}
} else "" })
)
}
Pair(
Pair(rowIdentifier, jsonObject),
Report(
rowIdentifier,
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, count)
)
} catch (ex: InvalidInputException) {
return@map Pair(
Pair(ex.key, JsonObject()),
Report(ex.key, ReportStatus.failure, ReportMessages.reportFailure(ex.localizedMessage))
)
}
}.filterNotNull()
}
val jsonObject = json {
obj(
zip(
propertiesList,
row.map { cell ->
if (cell != null) {
when (cell.cellType) {
CellType.BLANK -> ""
CellType.BOOLEAN -> cell.booleanCellValue.toString()
CellType._NONE -> ""
CellType.NUMERIC -> cell.numericCellValue.toString()
CellType.STRING -> cell.stringCellValue
CellType.FORMULA -> ""
CellType.ERROR -> ""
else -> ""
}
} else ""
})
)
}
Pair(
Pair(rowIdentifier, jsonObject),
Report(
rowIdentifier,
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, count)
)
)
}
}.filterNotNull()
}
}
}
......
......@@ -23,7 +23,6 @@ data class Report(
val status: String,
val message: String
) {
override fun equals(other: Any?): Boolean {
return when (other) {
null -> false
......
......@@ -31,7 +31,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedSftpServer
import org.memobase.testing.EmbeddedSftpServer
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment