Commit ea619f11 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor step name.

Step name can now be configured.
parent a49a7aae
...@@ -5,6 +5,7 @@ metadata: ...@@ -5,6 +5,7 @@ metadata:
namespace: memobase namespace: memobase
data: data:
APPLICATION_ID: {{ .Values.applicationId }} APPLICATION_ID: {{ .Values.applicationId }}
REPORTING_STEP_NAME: "{{ .Values.reportingStepName }}"
TOPIC_IN: {{ .Values.inputTopicName }} TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }} TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }} TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
...@@ -8,6 +8,8 @@ applicationId: placeholder ...@@ -8,6 +8,8 @@ applicationId: placeholder
kafkaConfigs: placeholder kafkaConfigs: placeholder
sftpConfigs: placeholder sftpConfigs: placeholder
reportingStepName: 02-table-data-transform
inputTopicName: placeholder inputTopicName: placeholder
outputTopicName: placeholder outputTopicName: placeholder
reportingTopicName: placeholder reportingTopicName: placeholder
\ No newline at end of file
...@@ -31,7 +31,8 @@ import org.memobase.models.Formats ...@@ -31,7 +31,8 @@ import org.memobase.models.Formats
import org.memobase.models.Message import org.memobase.models.Message
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
private val parser = TableParser(settings) private val step = settings.appSettings.getProperty(Service.reportingStepNamePropName)
private val parser = TableParser(step, settings)
private val reportingTopic = settings.processReportTopic private val reportingTopic = settings.processReportTopic
private val klaxon = Klaxon() private val klaxon = Klaxon()
private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls, Formats.ods) private val acceptedFormats = listOf(Formats.csv, Formats.xlsx, Formats.tsv, Formats.xls, Formats.ods)
...@@ -78,14 +79,14 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -78,14 +79,14 @@ class KafkaTopology(private val settings: SettingsLoader) {
readOnlyKey, readOnlyKey,
ReportStatus.fatal, ReportStatus.fatal,
"Could not parse message from data $data.", "Could not parse message from data $data.",
Service.name step
)) ))
} else { } else {
Pair(parsedMessage, Report( Pair(parsedMessage, Report(
readOnlyKey, readOnlyKey,
ReportStatus.success, ReportStatus.success,
"", "",
Service.name step
)) ))
} }
} catch (ex: KlaxonException) { } catch (ex: KlaxonException) {
...@@ -93,7 +94,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -93,7 +94,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
readOnlyKey, readOnlyKey,
ReportStatus.fatal, ReportStatus.fatal,
"JSON Parser Error: ${ex.localizedMessage}.", "JSON Parser Error: ${ex.localizedMessage}.",
Service.name step
)) ))
} }
} }
......
...@@ -25,11 +25,11 @@ class Service(file: String = "app.yml") { ...@@ -25,11 +25,11 @@ class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService") private val log = LogManager.getLogger("TableDataService")
companion object { companion object {
const val name = "table-data-transform" const val reportingStepNamePropName = "reportingStepName"
} }
val settings = SettingsLoader( val settings = SettingsLoader(
listOf(), listOf(reportingStepNamePropName),
file, file,
useStreamsConfig = true, useStreamsConfig = true,
readSftpSettings = true readSftpSettings = true
......
...@@ -35,7 +35,7 @@ import org.memobase.models.Formats ...@@ -35,7 +35,7 @@ import org.memobase.models.Formats
import org.memobase.models.Message import org.memobase.models.Message
import org.memobase.models.ResultMessage import org.memobase.models.ResultMessage
class TableParser(settings: SettingsLoader) { class TableParser(private val step: String, settings: SettingsLoader) {
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings) private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val invalidPropertyNameCharacters = listOf('.', ':', '/', '+') private val invalidPropertyNameCharacters = listOf('.', ':', '/', '+')
...@@ -48,29 +48,42 @@ class TableParser(settings: SettingsLoader) { ...@@ -48,29 +48,42 @@ class TableParser(settings: SettingsLoader) {
} }
} catch (ex: CSVFieldNumDifferentException) { } catch (ex: CSVFieldNumDifferentException) {
listOf( listOf(
ResultMessage(key, ResultMessage(
null, key,
Report(key, null,
ReportStatus.fatal, Report(
"Invalid CSV Input: ${ex.localizedMessage}.", key,
Service.name) ReportStatus.fatal,
)) "Invalid CSV Input: ${ex.localizedMessage}.",
step
)
)
)
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
listOf( listOf(
ResultMessage(key, ResultMessage(
null, key,
Report(key, null,
ReportStatus.fatal, Report(
"Invalid Input: ${ex.localizedMessage}. Could not process any lines.", key,
Service.name) ReportStatus.fatal,
)) "Invalid Input: ${ex.localizedMessage}. Could not process any lines.",
step
)
)
)
} catch (ex: IllegalArgumentException) { // Sheet index does not exist } catch (ex: IllegalArgumentException) { // Sheet index does not exist
listOf(ResultMessage(key, null, listOf(
Report(key, ResultMessage(
ReportStatus.fatal, key, null,
"Invalid Sheet Index provided. The sheet index ${metadata.tableSheetIndex} does not exist.", Report(
Service.name) key,
)) ReportStatus.fatal,
"Invalid Sheet Index provided. The sheet index ${metadata.tableSheetIndex} does not exist.",
step
)
)
)
} }
} }
...@@ -80,12 +93,12 @@ class TableParser(settings: SettingsLoader) { ...@@ -80,12 +93,12 @@ class TableParser(settings: SettingsLoader) {
val inputStream = sftpClient.open(File(value.path)) val inputStream = sftpClient.open(File(value.path))
val reader = val reader =
csvReader { csvReader {
this.quoteChar = '"' this.quoteChar = '"'
this.delimiter = if (value.format == Formats.csv) ',' else '\t' this.delimiter = if (value.format == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName() this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true // this.skipEmptyLine = true
}.readAll(inputStream) }.readAll(inputStream)
var headerProperties = emptyList<String>() var headerProperties = emptyList<String>()
var count = 0 var count = 0
for (line in reader) { for (line in reader) {
...@@ -97,12 +110,12 @@ class TableParser(settings: SettingsLoader) { ...@@ -97,12 +110,12 @@ class TableParser(settings: SettingsLoader) {
val trimmedProperty = property.trim() val trimmedProperty = property.trim()
if (trimmedProperty.isEmpty()) { if (trimmedProperty.isEmpty()) {
throw InvalidInputException( throw InvalidInputException(
"Missing a property name on row $count in column ${index + 1}!" "Missing a property name on row $count in column ${index + 1}!"
) )
} }
if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) { if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException( throw InvalidInputException(
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . " "Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
) )
} }
} }
...@@ -115,12 +128,12 @@ class TableParser(settings: SettingsLoader) { ...@@ -115,12 +128,12 @@ class TableParser(settings: SettingsLoader) {
when (identifierValue) { when (identifierValue) {
"" -> { "" -> {
throw InvalidInputException( throw InvalidInputException(
"The row $count has an empty identifier in column ${metadata.tableIdentifierIndex}." "The row $count has an empty identifier in column ${metadata.tableIdentifierIndex}."
) )
} }
in identifierSet -> { in identifierSet -> {
throw InvalidInputException( throw InvalidInputException(
"The row $count contains a duplicated identifier in column ${metadata.tableIdentifierIndex} with another row." "The row $count contains a duplicated identifier in column ${metadata.tableIdentifierIndex} with another row."
) )
} }
else -> { else -> {
...@@ -131,29 +144,29 @@ class TableParser(settings: SettingsLoader) { ...@@ -131,29 +144,29 @@ class TableParser(settings: SettingsLoader) {
} }
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
resultMessages.add( resultMessages.add(
ResultMessage( ResultMessage(
key, key,
null, null,
Report( Report(
key, key,
ReportStatus.fatal, ReportStatus.fatal,
ex.localizedMessage, ex.localizedMessage,
Service.name step
)
) )
)
) )
continue continue
} }
val keyValueMap = json { val keyValueMap = json {
obj( obj(
zip(headerProperties, line) zip(headerProperties, line)
) )
} }
val report = Report( val report = Report(
identifier, identifier,
ReportStatus.success, ReportStatus.success,
"", "",
Service.name step
) )
resultMessages.add(ResultMessage(identifier, keyValueMap, report)) resultMessages.add(ResultMessage(identifier, keyValueMap, report))
} }
...@@ -177,19 +190,19 @@ class TableParser(settings: SettingsLoader) { ...@@ -177,19 +190,19 @@ class TableParser(settings: SettingsLoader) {
propertiesList.addAll(row.map { cell -> propertiesList.addAll(row.map { cell ->
if (retrieveCellValue(cell).isNotEmpty()) { if (retrieveCellValue(cell).isNotEmpty()) {
if (retrieveCellValue(cell).any { char -> if (retrieveCellValue(cell).any { char ->
invalidPropertyNameCharacters.contains( invalidPropertyNameCharacters.contains(
char char
) )
}) { }) {
throw InvalidInputException( throw InvalidInputException(
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters." "The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
) )
} else { } else {
retrieveCellValue(cell) retrieveCellValue(cell)
} }
} else { } else {
throw InvalidInputException( throw InvalidInputException(
"The header index is missing a value in cell ${cell.address}" "The header index is missing a value in cell ${cell.address}"
) )
} }
}.map { it.trim() }) }.map { it.trim() })
...@@ -202,12 +215,12 @@ class TableParser(settings: SettingsLoader) { ...@@ -202,12 +215,12 @@ class TableParser(settings: SettingsLoader) {
when (val cellValue = retrieveCellValue(cell)) { when (val cellValue = retrieveCellValue(cell)) {
"" -> { "" -> {
throw InvalidInputException( throw InvalidInputException(
"The row ${row.rowNum} has an empty identifier in column ${metadata.tableIdentifierIndex}." "The row ${row.rowNum} has an empty identifier in column ${metadata.tableIdentifierIndex}."
) )
} }
in identifierSet -> { in identifierSet -> {
throw InvalidInputException( throw InvalidInputException(
"The row ${row.rowNum} contains a duplicated identifier in column ${metadata.tableIdentifierIndex} with another row." "The row ${row.rowNum} contains a duplicated identifier in column ${metadata.tableIdentifierIndex} with another row."
) )
} }
else -> { else -> {
...@@ -217,36 +230,36 @@ class TableParser(settings: SettingsLoader) { ...@@ -217,36 +230,36 @@ class TableParser(settings: SettingsLoader) {
} }
} else { } else {
throw InvalidInputException( throw InvalidInputException(
"No cell found in row ${row.rowNum} for column ${metadata.tableIdentifierIndex}." "No cell found in row ${row.rowNum} for column ${metadata.tableIdentifierIndex}."
) )
} }
} }
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
return@map ResultMessage( return@map ResultMessage(
key, null, Report( key, null, Report(
key, key,
ReportStatus.fatal, ReportStatus.fatal,
ex.localizedMessage, ex.localizedMessage,
Service.name step
) )
) )
} }
val jsonObject = json { val jsonObject = json {
obj( obj(
zip( zip(
propertiesList, propertiesList,
retrieveCells(row, propertiesList.size - 1) retrieveCells(row, propertiesList.size - 1)
) )
) )
} }
ResultMessage( ResultMessage(
rowIdentifier, jsonObject, Report( rowIdentifier, jsonObject, Report(
rowIdentifier, rowIdentifier,
ReportStatus.success, ReportStatus.success,
"", "",
Service.name step
) )
) )
} }
// Empty rows create a null result. These are removed. // Empty rows create a null result. These are removed.
......
...@@ -3,6 +3,8 @@ sftp: ...@@ -3,6 +3,8 @@ sftp:
port: ${SFTP_PORT:?system} port: ${SFTP_PORT:?system}
user: ${SFTP_USER:?system} user: ${SFTP_USER:?system}
password: ${SFTP_PASSWORD:?system} password: ${SFTP_PASSWORD:?system}
app:
reportingStepName: ${REPORTING_STEP_NAME:?system}
kafka: kafka:
streams: streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system} bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
...@@ -2,5 +2,5 @@ ...@@ -2,5 +2,5 @@
"id": "test-id-1", "id": "test-id-1",
"status": "SUCCESS", "status": "SUCCESS",
"message": "", "message": "",
"step": "table-data-transform" "step": "test"
} }
\ No newline at end of file
...@@ -2,5 +2,5 @@ ...@@ -2,5 +2,5 @@
"id": "test-id-2", "id": "test-id-2",
"status": "SUCCESS", "status": "SUCCESS",
"message": "", "message": "",
"step": "table-data-transform" "step": "test"
} }
\ No newline at end of file
...@@ -2,5 +2,5 @@ ...@@ -2,5 +2,5 @@
"id": "test-3.csv", "id": "test-3.csv",
"status": "FATAL", "status": "FATAL",
"message": "Invalid CSV Input: Fields num seems to be 6 on each row, but on 2th csv row, fields num is 5..", "message": "Invalid CSV Input: Fields num seems to be 6 on each row, but on 2th csv row, fields num is 5..",
"step": "table-data-transform" "step": "test"
} }
\ No newline at end of file
...@@ -2,5 +2,5 @@ ...@@ -2,5 +2,5 @@
"id": "Test-1", "id": "Test-1",
"status": "SUCCESS", "status": "SUCCESS",
"message": "", "message": "",
"step": "table-data-transform" "step": "test"
} }
\ No newline at end of file
...@@ -2,5 +2,5 @@ ...@@ -2,5 +2,5 @@
"id": "Test-2", "id": "Test-2",
"status": "SUCCESS", "status": "SUCCESS",
"message": "", "message": "",
"step": "table-data-transform" "step": "test"
} }
\ No newline at end of file
...@@ -4,11 +4,7 @@ sftp: ...@@ -4,11 +4,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
sheet: 0 reportingStepName: test
header:
count: 3
line: 3
identifier: 1
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -4,11 +4,7 @@ sftp: ...@@ -4,11 +4,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
sheet: 0 reportingStepName: test
header:
count: 3
line: 3
identifier: 31
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -4,11 +4,7 @@ sftp: ...@@ -4,11 +4,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
sheet: 0 reportingStepName: test
header:
count: 3
line: 3
identifier: 1
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -4,11 +4,7 @@ sftp: ...@@ -4,11 +4,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
sheet: 1 reportingStepName: test
header:
count: 1
line: 1
identifier: 1
kafka: kafka:
streams: streams:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment