Commit b3baeb8a authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor to deployment

Remove old tests
Add new tests.
parent 925d8be7
Pipeline #23387 failed with stages
in 3 minutes and 11 seconds
......@@ -41,8 +41,7 @@ dependencies {
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.12.2'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
implementation 'org.memobase:memobase-service-utilities:2.0.15'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// XSLX / XSL Reader
......
apiVersion: v2
name: table-data-transform
description: This chart starts a table data transform job.
description: This chart creates the table data transform deployment.
type: application
version: 0.0.0
appVersion: 0.0.0
......
deploymentName: ip-xml-data-transform-deployment
applicationId: xml-data-transform-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
inputTopicName: import-process-data-transform
outputTopicName: import-process-mapper
reportingTopicName: import-process-reporting
\ No newline at end of file
......@@ -18,12 +18,13 @@ spec:
spec:
restartPolicy: Always
containers:
- name: "{{ .Values.processId }}-{{ .Values.jobName }}"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
envFrom:
- secretRef:
name: "{{ .Values.sftpConfigs }}"
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.deploymentName }}-config"
\ No newline at end of file
- name: "{{ .Values.deploymentName }}-container"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
imagePullPolicy: Always
envFrom:
- secretRef:
name: "{{ .Values.sftpConfigs }}"
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.deploymentName }}-config"
############################################
## Values in this section are the same for #
## all jobs #
############################################
#image values
registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/table-data-transform"
tag: "latest"
......
......@@ -18,53 +18,84 @@
package org.memobase
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.SettingsLoader
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.memobase.models.*
import org.memobase.settings.SettingsLoader
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.memobase.models.Formats
import org.memobase.models.Message
class KafkaTopology(private val settings: SettingsLoader) {
private val parser = TableParser(settings)
private val reportingTopic = settings.outputTopic + "-reporting"
private val reportingTopic = settings.processReportTopic
private val klaxon = Klaxon()
private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls, Formats.ods)
fun prepare(): StreamsBuilder {
fun build(): Topology {
val builder = StreamsBuilder()
val parsedTable = builder
.stream<String, String>(settings.inputTopic)
.mapValues { value -> klaxon.parse<Message>(value) }
.filter { _, value -> acceptedFormats.contains(value?.format) }
.mapValues { key, value -> parser.parseTable(key, value!!) }
val parseMessageStream = builder
.stream<String, String>(settings.inputTopic)
.mapValues { readOnlyKey, value -> parseMessage(readOnlyKey, value) }
.branch(
Predicate { _, value -> value.second.status == ReportStatus.fatal },
Predicate { _, value -> value.second.status != ReportStatus.fatal }
)
parseMessageStream[0]
.mapValues { value -> value.second.toJson() }
.to(reportingTopic)
val parsedTables = parseMessageStream[1]
.filter { _, value -> acceptedFormats.contains(value.first.format) }
.transformValues(HeaderExtractionTransformSupplier<Pair<Message, Report>>())
.flatMapValues { key, value -> parser.parseTable(key, value.first.first, value.second) }
.map { _, value -> KeyValue(value.key, value) }
parsedTables
.mapValues { value -> value.report.toJson() }
.to(reportingTopic)
return builder
}
parsedTables
.filter { _, value -> value.report.status != ReportStatus.fatal }
.mapValues { value -> klaxon.toJsonString(value.value) }
.to(settings.outputTopic)
private fun processValidInput(stream: KStream<String, ParserResult>) {
val records = stream
.flatMapValues { _, value -> value.messages }
records
.map { _, value -> KeyValue(value.key, value.value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> value.toJsonString() }
.to(settings.outputTopic)
return builder.build()
}
records
.map { _, value -> KeyValue(value.report.id, value.report) }
.mapValues { value -> value.toJson() }
.to(reportingTopic)
stream
.map { _, value -> KeyValue(value.processReport.id, value.processReport.toJson()) }
.to(settings.processReportTopic)
private fun parseMessage(readOnlyKey: String, data: String): Pair<Message, Report> {
return try {
val parsedMessage = klaxon.parse<Message>(data)
if (parsedMessage == null) {
Pair(Message("", ""), Report(
readOnlyKey,
ReportStatus.fatal,
"Could not parse message from data $data.",
Service.name
))
} else {
Pair(parsedMessage, Report(
readOnlyKey,
ReportStatus.success,
"",
Service.name
))
}
} catch (ex: KlaxonException) {
Pair(Message("", ""), Report(
readOnlyKey,
ReportStatus.fatal,
"JSON Parser Error: ${ex.localizedMessage}.",
Service.name
))
}
}
}
......@@ -18,51 +18,35 @@
package org.memobase
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
import ch.memobase.settings.SettingsLoader
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
companion object {
const val name = "table-data-transform"
}
val settings = SettingsLoader(
listOf(
"sheet",
"header.count",
"header.line",
"identifier"
),
listOf(),
file,
useStreamsConfig = true,
readSftpSettings = true
)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val registry = EffectsRegistry()
private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
fun run() {
registry.register(ShutdownMessage(appId.replace("-normalization-service", ""), "normalization-service", "termination"), shutdownEffect)
registry.run(builder, "import-process-admin")
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
throw Exception("Stream stopped running!")
}
}
private fun exit(): BoxedUnit {
exitProcess(0)
}
}
......@@ -18,96 +18,93 @@
package org.memobase
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.util.CSVFieldNumDifferentException
import java.io.File
import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.UtilityFunctions.retrieveCellValue
import org.memobase.UtilityFunctions.retrieveCells
import org.memobase.UtilityFunctions.zip
import org.memobase.models.ErrorResult
import org.memobase.models.Formats
import org.memobase.models.Message
import org.memobase.models.ParserResult
import org.memobase.models.ProcessReport
import org.memobase.models.Report
import org.memobase.models.ReportMessages
import org.memobase.models.ReportStatus
import org.memobase.models.ResultMessage
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class TableParser(settings: SettingsLoader) {
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val sheetIndex = settings.appSettings.getProperty("sheet").toInt()
private val headerCount = settings.appSettings.getProperty("header.count").toInt()
private val propertyNamesIndex = settings.appSettings.getProperty("header.line").toInt()
private val identifierIndex = settings.appSettings.getProperty("identifier").toInt()
private val invalidPropertyNameCharacters = listOf('.', ':', '/', '+')
fun parseTable(key: String, inputMessage: Message): ParserResult {
fun parseTable(key: String, inputMessage: Message, metadata: HeaderMetadata): List<ResultMessage> {
return try {
val result: List<ResultMessage> = when (inputMessage.format) {
Formats.xls, Formats.xlsx -> excelMapper(key, inputMessage)
Formats.csv, Formats.tsv -> csvMapper(key, inputMessage)
when (inputMessage.format) {
Formats.xls, Formats.xlsx -> excelMapper(key, inputMessage, metadata)
Formats.csv, Formats.tsv -> csvMapper(key, inputMessage, metadata)
else -> throw InvalidInputException(key, "Cannot parse the table with format ${inputMessage.format}.")
}
val failureCount = result.count { value -> value.report.status == ReportStatus.failure }
ParserResult(
result,
ProcessReport(
"table-data-transform",
ReportStatus.success,
result.size,
result.size - failureCount,
failureCount
)
)
} catch (ex: CSVFieldNumDifferentException) {
listOf(
ResultMessage(key,
null,
Report(key,
ReportStatus.fatal,
"Invalid CSV Input: ${ex.localizedMessage}.",
Service.name)
))
} catch (ex: InvalidInputException) {
ParserResult(
ex.key, ErrorResult.get(), Report(ex.key, ReportStatus.failure, ex.message ?: "Unknown issue."),
ProcessReport(ReportStatus.failure, 1)
)
listOf(
ResultMessage(key,
null,
Report(ex.key,
ReportStatus.fatal,
"Invalid Input: ${ex.localizedMessage}. Could not process any lines.",
Service.name)
))
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
ParserResult(
key, ErrorResult.get(), Report(key, ReportStatus.failure, ex.localizedMessage),
ProcessReport(ReportStatus.failure, 1)
)
listOf(ResultMessage(key, null,
Report(key,
ReportStatus.fatal,
"Invalid Sheet Index provided. The sheet index ${metadata.tableSheetIndex} does not exist.",
Service.name)
))
}
}
private fun csvMapper(key: String, value: Message): List<ResultMessage> {
private fun csvMapper(key: String, value: Message, metadata: HeaderMetadata): List<ResultMessage> {
val resultMessages = mutableListOf<ResultMessage>()
val indentifierSet = mutableSetOf<String>()
val identifierSet = mutableSetOf<String>()
val inputStream = sftpClient.open(File(value.path))
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value.format == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(inputStream)
csvReader {
this.quoteChar = '"'
this.delimiter = if (value.format == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(inputStream)
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
if (count <= metadata.tableHeaderCount) {
if (count == metadata.tableHeaderIndex) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
val trimmedProperty = property.trim()
if (trimmedProperty.isEmpty()) {
throw InvalidInputException(
key,
"Missing a property name on row $count in column ${index + 1}!"
key,
"Missing a property name on row $count in column ${index + 1}!"
)
}
if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(
key,
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
key,
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
)
}
}
......@@ -116,56 +113,58 @@ class TableParser(settings: SettingsLoader) {
}
// the -1 ensures, that users can start columns beginning at 1!
val identifier: String = try {
line[identifierIndex - 1].let { identifierValue ->
line[metadata.tableIdentifierIndex - 1].let { identifierValue ->
when (identifierValue) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
"$count.${metadata.tableIdentifierIndex}",
"The row $count has an empty identifier in column ${metadata.tableIdentifierIndex}."
)
}
in indentifierSet -> {
in identifierSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
"$count.${metadata.tableIdentifierIndex}",
"The row $count contains a duplicated identifier in column ${metadata.tableIdentifierIndex} with another row."
)
}
else -> {
indentifierSet.add(identifierValue)
identifierSet.add(identifierValue)
identifierValue
}
}
}
} catch (ex: InvalidInputException) {
resultMessages.add(
ResultMessage(
ex.key,
ErrorResult.get(),
Report(
ex.key,
ReportStatus.failure,
ReportMessages.reportFailure(ex.localizedMessage)
ResultMessage(
ex.key,
null,
Report(
ex.key,
ReportStatus.fatal,
ex.localizedMessage,
Service.name
)
)
)
)
continue
}
val keyValueMap = json {
obj(
zip(headerProperties, line)
zip(headerProperties, line)
)
}
val report = Report(
identifier,
ReportStatus.success,
ReportMessages.reportSuccess(identifier, count)
identifier,
ReportStatus.success,
"",
Service.name
)
resultMessages.add(ResultMessage(identifier, keyValueMap, report))
}
return resultMessages
}
private fun excelMapper(key: String, value: Message): List<ResultMessage> {
private fun excelMapper(key: String, value: Message, metadata: HeaderMetadata): List<ResultMessage> {
val inputStream = sftpClient.open(File(value.path))
val workbook = WorkbookFactory.create(inputStream)
// only XSL stream closes the input stream. The XSLX stream does not
......@@ -173,30 +172,30 @@ class TableParser(settings: SettingsLoader) {
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
// sheet index is 0-based. This ensures that users can access sheet 1 with index 1!
val sheet = workbook.getSheetAt(sheetIndex - 1)
val sheet = workbook.getSheetAt(metadata.tableSheetIndex - 1)
var count = 0
return sheet.filterEmptyRows().map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
if (count <= metadata.tableHeaderCount) {
if (count == metadata.tableHeaderIndex) {
propertiesList.addAll(row.map { cell ->
if (retrieveCellValue(cell).isNotEmpty()) {
if (retrieveCellValue(cell).any { char ->
invalidPropertyNameCharacters.contains(
char
)
}) {
invalidPropertyNameCharacters.contains(
char
)
}) {
throw InvalidInputException(
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
)
} else {
retrieveCellValue(cell)
}
} else {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
key,
"The header index is missing a value in cell ${cell.address}"
)
}
}.map { it.trim() })
......@@ -204,19 +203,19 @@ class TableParser(settings: SettingsLoader) {
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex - 1).let { cell ->
row.getCell(metadata.tableIdentifierIndex - 1).let { cell ->
if (cell != null) {
when (val cellValue = retrieveCellValue(cell)) {
"" -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
"CellAddress: $count:${metadata.tableIdentifierIndex}",
"The row ${row.rowNum} has an empty identifier in column ${metadata.tableIdentifierIndex}."
)
}
in identifierSet -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
"CellAddress: $count:${metadata.tableIdentifierIndex}",
"The row ${row.rowNum} contains a duplicated identifier in column ${metadata.tableIdentifierIndex} with another row."
)
}
else -> {
......@@ -226,35 +225,37 @@ class TableParser(settings: SettingsLoader) {
}
} else {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
"${row.rowNum}.${metadata.tableIdentifierIndex}",
"No cell found in row ${row.rowNum} for column ${metadata.tableIdentifierIndex}."
)
}
}
} catch (ex: InvalidInputException) {
return@map ResultMessage(
ex.key, ErrorResult.get(), Report(
ex.key, null, Report(
ex.key,
ReportStatus.failure,
ReportMessages.reportFailure(ex.localizedMessage)
)
ReportStatus.fatal,
ex.localizedMessage,
Service.name
)
)
}
val jsonObject = json {
obj(
zip(
propertiesList,
retrieveCells(row, propertiesList.size - 1)
)
zip(
propertiesList,
retrieveCells(row, propertiesList.size - 1)
)
)
}
ResultMessage(
rowIdentifier, jsonObject, Report(
rowIdentifier, jsonObject, Report(
rowIdentifier,
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, count)
)
"",
Service.name
)
)
}
// Empty rows create a null result. These are removed.
......
/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,