Commit a9c86993 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update dependency & implement shutdown hook

parent 27726a51
Pipeline #13543 passed with stages
in 6 minutes and 14 seconds
......@@ -32,9 +32,6 @@ ext {
}
dependencies {
// https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
//compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.1.0'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
......@@ -44,7 +41,8 @@ dependencies {
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.4.0'
implementation 'org.memobase:memobase-service-utilities:1.12.1'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// XSLX / XSL Reader
......@@ -55,8 +53,6 @@ dependencies {
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// Compression
//implementation "org.apache.commons:commons-compress:1.19"
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
......
package org.memobase
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.Row
import org.apache.poi.ss.usermodel.Sheet
fun Sheet.filterEmptyRows(): Iterable<Row> {
return this.filter { row ->
row.any { cell ->
// filter all rows with no string, boolean or numeric cell
when (cell.cellType) {
CellType._NONE -> false
CellType.NUMERIC -> true
CellType.STRING -> true
CellType.FORMULA -> false
CellType.BLANK -> false
CellType.BOOLEAN -> true
CellType.ERROR -> false
else -> false
}
}
}
}
\ No newline at end of file
......@@ -41,7 +41,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val klaxon = Klaxon()
fun build(): Topology {
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
val branchedSource = builder
......@@ -57,7 +57,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
processValidInput(branchedSource[1]
.mapValues { key, value -> parser.parseTable(key, value) })
return builder.build()
return builder
}
private fun processValidInput(stream: KStream<String, ParserResult>) {
......
......@@ -18,9 +18,15 @@
package org.memobase
import ch.memobase.Effect
import ch.memobase.EffectsRegistry
import ch.memobase.ShutdownMessage
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import scala.Some
import scala.runtime.BoxedUnit
import kotlin.system.exitProcess
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
......@@ -37,10 +43,16 @@ class Service(file: String = "app.yml") {
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
private val appId = settings.kafkaStreamsSettings.getProperty("application.id")
val builder = KafkaTopology(settings).prepare()
private val registry = EffectsRegistry()
private val shutdownEffect = Effect("shutdown", this::exit, Some("Shutting down application"))
fun run() {
registry.register(ShutdownMessage(appId.replace("-normalization-service", ""), "normalization-service", "termination"), shutdownEffect)
registry.run(builder, "import-process-admin")
val stream = KafkaStreams(builder.build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
......@@ -49,4 +61,8 @@ class Service(file: String = "app.yml") {
}
}
}
private fun exit(): BoxedUnit {
exitProcess(0)
}
}
......@@ -21,7 +21,6 @@ package org.memobase
import com.beust.klaxon.json
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import java.io.File
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.UtilityFunctions.retrieveCellValue
import org.memobase.UtilityFunctions.retrieveCells
......@@ -57,223 +56,208 @@ class TableParser(settings: SettingsLoader) {
val failureCount = result.count { value -> value.report.status == ReportStatus.failure }
ParserResult(
result,
ProcessReport(
"table-data-transform",
ReportStatus.success,
result.size,
result.size - failureCount,
failureCount
)
result,
ProcessReport(
"table-data-transform",
ReportStatus.success,
result.size,
result.size - failureCount,
failureCount
)
)
} catch (ex: InvalidInputException) {
ParserResult(
ex.key, ErrorResult.get(), Report(ex.key, ReportStatus.failure, ex.message ?: "Unknown issue."),
ProcessReport(ReportStatus.failure, 1)
ex.key, ErrorResult.get(), Report(ex.key, ReportStatus.failure, ex.message ?: "Unknown issue."),
ProcessReport(ReportStatus.failure, 1)
)
} catch (ex: IllegalArgumentException) { // Sheet index does not exist
ParserResult(
key, ErrorResult.get(), Report(key, ReportStatus.failure, ex.localizedMessage),
ProcessReport(ReportStatus.failure, 1)
key, ErrorResult.get(), Report(key, ReportStatus.failure, ex.localizedMessage),
ProcessReport(ReportStatus.failure, 1)
)
}
}
private fun csvMapper(key: String, value: Message): List<ResultMessage> {
val resultMessages = mutableListOf<ResultMessage>()
val mutableSet = mutableSetOf<String>()
sftpClient.open(File(value.path)).use { inputStream ->
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value.format == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(inputStream)
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
val trimmedProperty = property.trim()
if (trimmedProperty.isEmpty()) {
throw InvalidInputException(
key,
"Missing a property name on row $count in column ${index + 1}!"
)
}
if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(
key,
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
)
}
val indentifierSet = mutableSetOf<String>()
val inputStream = sftpClient.open(File(value.path))
val reader =
csvReader {
this.quoteChar = '"'
this.delimiter = if (value.format == Formats.csv) ',' else '\t'
this.charset = Charsets.UTF_8.displayName()
// this.skipEmptyLine = true
}.readAll(inputStream)
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
val trimmedProperty = property.trim()
if (trimmedProperty.isEmpty()) {
throw InvalidInputException(
key,
"Missing a property name on row $count in column ${index + 1}!"
)
}
if (trimmedProperty.any { value -> invalidPropertyNameCharacters.contains(value) }) {
throw InvalidInputException(
key,
"Invalid property name $trimmedProperty on row $count in column ${index + 1}! You may not use the any of the following characters: + , . "
)
}
}
continue
}
// the -1 ensures, that users can start columns beginning at 1!
val identifier: String = try {
line[identifierIndex - 1].let { value ->
when (value) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
)
}
in mutableSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
mutableSet.add(value)
value
}
continue
}
// the -1 ensures, that users can start columns beginning at 1!
val identifier: String = try {
line[identifierIndex - 1].let { identifierValue ->
when (identifierValue) {
"" -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count has an empty identifier in column $identifierIndex."
)
}
}
} catch (ex: InvalidInputException) {
resultMessages.add(
ResultMessage(
ex.key,
ErrorResult.get(),
Report(
ex.key,
ReportStatus.failure,
ReportMessages.reportFailure(ex.localizedMessage)
)
in indentifierSet -> {
throw InvalidInputException(
"$count.$identifierIndex",
"The row $count contains a duplicated identifier in column $identifierIndex with another row."
)
)
continue
}
else -> {
indentifierSet.add(identifierValue)
identifierValue
}
}
}
val keyValueMap = json {
obj(
zip(headerProperties, line)
} catch (ex: InvalidInputException) {
resultMessages.add(
ResultMessage(
ex.key,
ErrorResult.get(),
Report(
ex.key,
ReportStatus.failure,
ReportMessages.reportFailure(ex.localizedMessage)
)
)
}
val report = Report(
identifier,
ReportStatus.success,
ReportMessages.reportSuccess(identifier, count)
)
resultMessages.add(ResultMessage(identifier, keyValueMap, report))
continue
}
val keyValueMap = json {
obj(
zip(headerProperties, line)
)
}
val report = Report(
identifier,
ReportStatus.success,
ReportMessages.reportSuccess(identifier, count)
)
resultMessages.add(ResultMessage(identifier, keyValueMap, report))
}
return resultMessages
}
private fun excelMapper(key: String, value: Message): List<ResultMessage> {
return sftpClient.open(File(value.path)).use { inputStream ->
WorkbookFactory.create(inputStream).use { workbook ->
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
// sheet index is 0-based. This ensures that users can access sheet 1 with index 1!
val sheet = workbook.getSheetAt(sheetIndex - 1)
var count = 0
sheet.filter { row ->
row.any { cell ->
// filter all rows with no string, boolean or numeric cell
when (cell.cellType) {
CellType._NONE -> false
CellType.NUMERIC -> true
CellType.STRING -> true
CellType.FORMULA -> false
CellType.BLANK -> false
CellType.BOOLEAN -> true
CellType.ERROR -> false
else -> false
val inputStream = sftpClient.open(File(value.path))
val workbook = WorkbookFactory.create(inputStream)
// only XSL stream closes the input stream. The XSLX stream does not
inputStream.close()
val identifierSet = mutableSetOf<String>()
val propertiesList = mutableListOf<String>()
// sheet index is 0-based. This ensures that users can access sheet 1 with index 1!
val sheet = workbook.getSheetAt(sheetIndex - 1)
var count = 0
return sheet.filterEmptyRows().map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
propertiesList.addAll(row.map { cell ->
if (retrieveCellValue(cell).isNotEmpty()) {
if (retrieveCellValue(cell).any { char ->
invalidPropertyNameCharacters.contains(
char
)
}) {
throw InvalidInputException(
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
)
} else {
retrieveCellValue(cell)
}
} else {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
)
}
}
}.map { row ->
count += 1
if (count <= headerCount) {
if (count == propertyNamesIndex) {
propertiesList.addAll(row.map { cell ->
if (retrieveCellValue(cell).isNotEmpty()) {
if (retrieveCellValue(cell).any { char ->
invalidPropertyNameCharacters.contains(
char
)
}) {
throw InvalidInputException(
key,
"The property in cell ${cell.address} contains one or more invalid characters: $invalidPropertyNameCharacters."
)
} else {
retrieveCellValue(cell)
}
} else {
}.map { it.trim() })
}
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex - 1).let { cell ->
if (cell != null) {
when (val cellValue = retrieveCellValue(cell)) {
"" -> {
throw InvalidInputException(
key,
"The header index is missing a value in cell ${cell.address}"
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
)
}
}.map { it.trim() })
}
null
} else {
val rowIdentifier: String = try {
row.getCell(identifierIndex - 1).let { cell ->
if (cell != null) {
when (val cellValue = retrieveCellValue(cell)) {
"" -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} has an empty identifier in column $identifierIndex."
)
}
in identifierSet -> {
throw InvalidInputException(
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
identifierSet.add(cellValue)
cellValue
}
}
} else {
in identifierSet -> {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
"CellAddress: $count:$identifierIndex",
"The row ${row.rowNum} contains a duplicated identifier in column $identifierIndex with another row."
)
}
else -> {
identifierSet.add(cellValue)
cellValue
}
}
} catch (ex: InvalidInputException) {
return@map ResultMessage(
ex.key, ErrorResult.get(), Report(
ex.key,
ReportStatus.failure,
ReportMessages.reportFailure(ex.localizedMessage)
)
)
}
val jsonObject = json {
obj(
zip(
propertiesList,
retrieveCells(row, propertiesList.size - 1)
)
} else {
throw InvalidInputException(
"${row.rowNum}.$identifierIndex",
"No cell found in row ${row.rowNum} for column $identifierIndex."
)
}
ResultMessage(
rowIdentifier, jsonObject, Report(
rowIdentifier,
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, count)
}
} catch (ex: InvalidInputException) {
return@map ResultMessage(
ex.key, ErrorResult.get(), Report(
ex.key,
ReportStatus.failure,
ReportMessages.reportFailure(ex.localizedMessage)
)
)
}
val jsonObject = json {
obj(
zip(
propertiesList,
retrieveCells(row, propertiesList.size - 1)
)
}
// Empty rows create a null result. These are removed.
}.filterNotNull()
)
}
ResultMessage(
rowIdentifier, jsonObject, Report(
rowIdentifier,
ReportStatus.success,
ReportMessages.reportSuccess(rowIdentifier, count)
)
)
}
}
// Empty rows create a null result. These are removed.
}.filterNotNull()
}
}
......@@ -22,6 +22,7 @@ import org.apache.poi.ss.usermodel.Cell
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.Row
object UtilityFunctions {
/**
......
......@@ -74,7 +74,7 @@ class TestIntegration {
@MethodSource("testParams")
fun `test kafka integrations`(params: TestParams) {
val service = Service(params.settingsFileName)
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val testDriver = TopologyTestDriver(service.builder.build(), service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment