Commit 56b12b32 authored by Jonas Waeber's avatar Jonas Waeber

refactor table data parser

Load external library for most things and fixes some errors.
parent 51a1db93
Pipeline #8538 passed with stages
in 5 minutes and 13 seconds
......@@ -2,4 +2,3 @@
[Confluence Doku](https://memobase.atlassian.net/wiki/spaces/TBAS/pages/48693312/Service+Table-Data+Formatter)
......@@ -21,8 +21,9 @@ targetCompatibility = 1.8
repositories {
jcenter()
maven { url 'https://gitlab.com/api/v4/projects/11507450/packages/maven' }
maven { url 'https://jitpack.io' }
maven {
url "https://dl.bintray.com/jonas-waeber/memobase"
}
}
ext {
......@@ -43,13 +44,10 @@ dependencies {
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
//testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
implementation 'org.memobase:memobase-service-utilities:1.2.0'
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
// YAML Parser
implementation 'org.snakeyaml:snakeyaml-engine:2.1'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// XSLX / XSL Reader
......@@ -71,18 +69,6 @@ dependencies {
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
testImplementation "org.apache.kafka:kafka-clients:$kafkaV:test"
testImplementation "org.apache.kafka:kafka_2.11:$kafkaV"
testImplementation "org.apache.kafka:kafka_2.11:$kafkaV:test"
// https://mvnrepository.com/artifact/com.github.marschall/memoryfilesystem
testCompile group: 'com.github.marschall', name: 'memoryfilesystem', version: '2.1.0'
// https://mvnrepository.com/artifact/org.apache.sshd/sshd-core
testCompile group: 'org.apache.sshd', name: 'sshd-core', version: '2.4.0'
// https://mvnrepository.com/artifact/org.apache.sshd/sshd-sftp
testCompile group: 'org.apache.sshd', name: 'sshd-sftp', version: '2.4.0'
}
compileKotlin {
......
......@@ -20,6 +20,7 @@ package org.memobase
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import kotlin.system.exitProcess
class App {
......@@ -27,7 +28,15 @@ class App {
private val log = LogManager.getLogger("TableDataTransform")
@JvmStatic fun main(args: Array<String>) {
try {
val settings = SettingsLoader("app.yml")
val settings = SettingsLoader(
listOf(
"sheet",
"header.count",
"header.line",
"identifier"
),
readSftpSettings = true
)
val topology = KafkaTopology(settings).build()
val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
......
/*
* record-parser
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.logging.log4j.Logger
import java.util.*
class KafkaSettings(private val map: Map<String, String>, private val log: Logger) {
val settings = Properties()
fun setKafkaProperty(propertyName: String, defaultValue: Any? = null, abortIfMissing: Boolean = false) {
val envProperty = propertyName.replace("\\.".toRegex(), "_").toUpperCase()
when {
System.getenv(envProperty) != null -> {
log.debug("Found value for property $propertyName in environment variable $envProperty.")
settings.setProperty(propertyName, System.getenv(envProperty))
}
map.containsKey(propertyName) -> {
log.debug("Found value for property $propertyName in app.yml file.")
settings.setProperty(propertyName, map[propertyName])
}
defaultValue != null -> {
log.debug("Using default value $defaultValue for $propertyName.")
settings[propertyName] = defaultValue
}
abortIfMissing -> {
log.error("Required producer property $propertyName was not set! Aborting...")
throw MissingSettingException("missing", "kafka.producer.$propertyName")
}
else -> log.trace("No value for property $propertyName found")
}
}
}
\ No newline at end of file
......@@ -28,6 +28,8 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import java.io.File
import java.io.StringReader
......@@ -35,11 +37,13 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("KafkaTopology")
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val sheetIndex = settings.appSettings.getProperty("sheet.index").toInt()
private val sheetIndex = settings.appSettings.getProperty("sheet").toInt()
private val headerCount = settings.appSettings.getProperty("header.count").toInt()
private val propertyNamesIndex = settings.appSettings.getProperty("header.line").toInt()
private val identifierIndex = settings.appSettings.getProperty("identifier").toInt()
private val reportingTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
val builder = StreamsBuilder()
......@@ -51,20 +55,23 @@ class KafkaTopology(private val settings: SettingsLoader) {
)
// report filtered error message from previous job.
// TODO: what to send to main topic?
branchedSource[1]
.map { key, _ ->
reportToJson(
.mapValues { key, _ ->
Klaxon().toJsonString(
Report(
key,
"Failure",
"Ignored message as file validator reported error.",
0
"FAILURE",
"Ignored message due to previous error."
)
)
}
.to("${settings.outputTopic}-reporting")
// filtered result simply sends ERROR along!
branchedSource[1]
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
// work branch
val formatBranches = branchedSource[0]
.mapValues { _, value -> parseJsonObject(value) }
......@@ -76,28 +83,28 @@ class KafkaTopology(private val settings: SettingsLoader) {
// CSV Branch
buildHelper(formatBranches[0]
.flatMapValues { _, value -> csvMapper(value) })
// Excel Branch
buildHelper(formatBranches[1]
.flatMapValues { value -> excelMapper(value) })
// OpenDocument Spreadsheet Branch
buildHelper(formatBranches[2]
.flatMapValues { value -> odsMapper(value) })
.mapValues { key, value -> errorWrapper(key, value) })
return builder.build()
}
private fun buildHelper(stream: KStream<String, Pair<Pair<String, JsonObject>, Report>>) {
stream
.map { _, value -> KeyValue(value.first.first, value.first.second.toJsonString()) }
private fun buildHelper(stream: KStream<String, Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report>>) {
val records = stream
.flatMapValues { _, value -> value.first }
records
.map { _, value -> KeyValue(value.first.first, value.first.second) }
.mapValues { value -> value.toJsonString() }
.to(settings.outputTopic)
records
.map { _, value -> KeyValue(value.second.id, value.second) }
.mapValues { value -> Klaxon().toJsonString(value) }
.to(reportingTopic)
stream
.map { _, value -> reportToJson(value.second) }
.to("${settings.outputTopic}-reporting")
.to(settings.processReportTopic)
}
private fun filter(value: String): Boolean {
......@@ -105,10 +112,18 @@ class KafkaTopology(private val settings: SettingsLoader) {
}
private fun parseJsonObject(value: String): JsonObject {
// TODO: try catch errors
return Klaxon().parseJsonObject(StringReader(value))
}
private fun errorWrapper(key: String, value: JsonObject): Pair<List<Pair<Pair<String, JsonObject>, Report>>, Report> {
return try {
val result = csvMapper(value)
Pair(result, Report(key, "SUCCESS", "Transformed table data into ${result.count()} records."))
} catch (ex: InvalidInputException) {
Pair(emptyList(), Report(key, "FAILED", ex.localizedMessage))
}
}
private fun csvMapper(value: JsonObject): List<Pair<Pair<String, JsonObject>, Report>> {
val resultMessages = mutableListOf<Pair<Pair<String, JsonObject>, Report>>()
val mutableSet = mutableSetOf<String>()
......@@ -131,17 +146,18 @@ class KafkaTopology(private val settings: SettingsLoader) {
headerProperties = line
headerProperties.forEachIndexed { index, property ->
if (property.isEmpty()) {
throw InvalidInputException("Missing a property name on row $count in column $index!")
throw InvalidInputException("Missing a property name on row $count in column ${index + 1}!")
}
if (property.contains(Regex("[+,.]"))) {
throw InvalidInputException("Invalid property name $property on row $count in column ${index + 1}! You may not use the any of the following characters: + , . ")
}
// TODO: Any validations on the field names themselves?
// Like no special characters? might be a good idea for processing
}
}
continue
}
// the +1 ensures, that users can start columns beginning at 1!
val identifier = line[identifierIndex + 1]
// the -1 ensures, that users can start columns beginning at 1!
val identifier = line[identifierIndex - 1]
when {
identifier.isEmpty() -> {
throw InvalidInputException("The unique identifier in column $identifierIndex in row $count is empty!")
......@@ -153,20 +169,16 @@ class KafkaTopology(private val settings: SettingsLoader) {
mutableSet.add(identifier)
}
}
val keyValueMap = json {
obj(
headerProperties.zip(line)
)
}
val result = Pair(identifier, keyValueMap)
val report = Report(
identifier,
"SUCCESS",
"Successfully created record with identifier $identifier with format CSV!",
1
"Successfully created record with identifier $identifier from row $count!"
)
resultMessages.add(Pair(result, report))
}
......
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
class MissingSettingException(source: String, setting: String) :
Exception(
when (source) {
"env" -> "Internal Configuration Error: $setting!"
"missing" -> "Internal Configuration Error: Missing setting $setting in configuration file."
else -> "User Configuration Error: A value for setting $setting is required."
}
)
......@@ -18,14 +18,10 @@
package org.memobase
import java.time.Instant
data class Report(
val id: String,
val status: String,
val message: String,
val recordCount: Int,
val timeStamp: String = Instant.now().toString()
val message: String
) {
override fun equals(other: Any?): Boolean {
......@@ -40,7 +36,6 @@ data class Report(
var result = id.hashCode()
result = 31 * result + status.hashCode()
result = 31 * result + message.hashCode()
result = 31 * result + recordCount.hashCode()
return result
}
}
This diff is collapsed.
/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.sftp.FileAttributes
import net.schmizz.sshj.sftp.FileMode
import net.schmizz.sshj.sftp.RemoteFile
import net.schmizz.sshj.sftp.OpenMode
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.userauth.UserAuthException
import org.apache.logging.log4j.LogManager
import java.io.Closeable
import java.io.File
import java.net.ConnectException
import java.net.UnknownHostException
import java.util.Properties
import kotlin.system.exitProcess
class SftpClient(sftpSettings: Properties) : Closeable {
private val log = LogManager.getLogger("SftpClient")
private val ssh = SSHClient()
private val instance: SFTPClient
init {
try {
ssh.addHostKeyVerifier(PromiscuousVerifier())
ssh.connect(sftpSettings.getProperty("host"), sftpSettings.getProperty("port").toInt())
ssh.authPassword(sftpSettings.getProperty("user"), sftpSettings.getProperty("password"))
instance = ssh.newSFTPClient()
} catch (ex: UserAuthException) {
log.error("SFTP User Authentication Error: Invalid user authentication supplied.")
exitProcess(1)
} catch (ex: ConnectException) {
log.error("SFTP Connection Exception: ${ex.message}.")
exitProcess(1)
} catch (ex: UnknownHostException) {
log.error("SFTP Host Not Found: ${ex.message}.")
exitProcess(1)
} catch (ex: Exception) {
ex.printStackTrace()
log.error("SSH Exception: ${ex.localizedMessage}")
exitProcess(1)
}
}
fun listFiles(path: String): List<String> {
return ls(path).filter { fileAttributes(it).mode.type == FileMode.Type.REGULAR }
}
fun open(file: File): RemoteFile {
return instance.open(file.path, setOf(OpenMode.READ))
}
private fun fileAttributes(path: String): FileAttributes {
return instance.lstat(path)
}
private fun ls(path: String): List<String> {
return instance.ls(path).map { it.path }
}
override fun close() {
instance.close()
ssh.disconnect()
}
}
\ No newline at end of file
id: ${JOB_ID:?system}
sftp:
host: ${SFTP_HOST:?system}
port: ${SFTP_PORT:?system}
......@@ -10,12 +11,10 @@ app:
line: ${HEADER_LINE_INDEX:?user}
identifier: ${IDENTIFIER_INDEX:?user}
kafka:
producer:
bootstrap.servers: localhost:9092
client.id: ${CLIENT_ID:?system}
stream:
bootstrap.servers: localhost:9092
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
topic:
in: ${TOPIC_IN:?system}
out: ${TOPIC_OUT:?system}
\ No newline at end of file
out: ${TOPIC_OUT:?system}
process: ${TOPIC_PROCESS_REPORT:?system}
\ No newline at end of file
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.junit.jupiter.api.extension.BeforeAllCallback
import org.junit.jupiter.api.extension.ExtensionContext
class EmbeddedKafkaExtension : BeforeAllCallback {
lateinit var kafka: KafkaEmbedded
private val testTopic: String = "test"
private val port: Int = 12345
override fun beforeAll(context: ExtensionContext?) {
kafka = KafkaEmbedded(port, testTopic)
val wrapper = KafkaWrapper(kafka)
if (context != null) {
context.getStore(ExtensionContext.Namespace.create(EmbeddedKafkaExtension::class.java))
.put("kafka", wrapper)
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
wrapper.close()
}))
}
}
class KafkaWrapper(private val embedded: KafkaEmbedded) : ExtensionContext.Store.CloseableResource {
override fun close() {
embedded.close()
}
}
}
/*
* record-parser
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.github.marschall.memoryfilesystem.MemoryFileSystemBuilder.newLinux
import org.apache.sshd.server.SshServer
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory
import java.io.Closeable
import java.io.IOException
import java.io.InputStream
import java.nio.charset.Charset
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.nio.file.WatchService
import java.nio.file.FileStore
import java.nio.file.attribute.UserPrincipalLookupService
import java.nio.file.spi.FileSystemProvider
class EmbeddedSftpServer(port: Int, user: String, password: String) : Closeable {
private val credentials = mapOf(Pair(user, password))
private val fileSystem: FileSystem = newLinux().build("EmbeddedSftpServerFileSystem@" + hashCode())
private val server: SshServer = SshServer.setUpDefaultServer()
init {
server.port = port
server.keyPairProvider = SimpleGeneratorHostKeyProvider()
server.setPasswordAuthenticator { authUser, authPassword, _ -> authenticate(authUser, authPassword) }
server.subsystemFactories = listOf(SftpSubsystemFactory())
/* When a channel is closed SshServer calls close() on the file system.
* In order to use the file system for multiple channels/sessions we
* have to use a file system wrapper whose close() does nothing.
*/
server.setFileSystemFactory { DoNotClose(fileSystem) }
server.start()
}
/**
* Put a text file on the SFTP folder. The file is available by the
* specified path.
* @param path the path to the file.
* @param content the files content.
* @param encoding the encoding of the file.
* @throws IOException if the file cannot be written.
*/
@Throws(IOException::class)
fun putFile(
path: String,
content: String,
encoding: Charset = Charset.defaultCharset()
) {
val contentAsBytes = content.toByteArray(encoding)
putFile(path, contentAsBytes)
}
/**
* Put a file on the SFTP folder. The file is available by the specified
* path.
* @param path the path to the file.
* @param content the files content.
* @throws IOException if the file cannot be written.
*/
@Throws(IOException::class)
fun putFile(
path: String,
content: ByteArray
) {
val pathAsObject = fileSystem.getPath(path)
ensureDirectoryOfPathExists(pathAsObject)
Files.write(pathAsObject, content)
}
/**
* Put a file on the SFTP folder. The file is available by the specified
* path. The file content is read from an `InputStream`.
* @param path the path to the file.
* @param inputStream an `InputStream` that provides the file's content.
* @throws IOException if the file cannot be written or the input stream
* cannot be read.
*/
@Throws(IOException::class)
fun putFile(
path: String,
inputStream: InputStream
) {
val pathAsObject = fileSystem.getPath(path)
ensureDirectoryOfPathExists(pathAsObject)
Files.copy(inputStream, pathAsObject)
}
/**
* Create a directory on the SFTP server.
* @param path the directory's path.
* @throws IOException if the directory cannot be created.
*/
@Throws(IOException::class)
fun createDirectory(
path: String
) {
val pathAsObject = fileSystem.getPath(path)
Files.createDirectories(pathAsObject)
}
/**
* Create multiple directories on the SFTP server.
* @param paths the directories' paths.
* @throws IOException if at least one directory cannot be created.
*/
@Throws(IOException::class)
fun createDirectories(
vararg paths: String
) {
for (path in paths) createDirectory(path)
}
/**
* Get a text file from the SFTP server.
* @param path the path to the file.
* @return the content of the text file.
* @throws IOException if the file cannot be read.
* @throws IllegalStateException if not called from within a test.
*/
@Throws(IOException::class)
fun getFileContent(
path: String,
encoding: Charset = Charset.defaultCharset()
): String {
return getFileContent(path).toString(encoding)
}
/**
* Get a file from the SFTP server.
* @param path the path to the file.
* @return the content of the file.
* @throws IOException if the file cannot be read.
* @throws IllegalStateException if not called from within a test.
*/
@Throws(IOException::class)
fun getFileContent(
path: String
): ByteArray {
val pathAsObject = fileSystem.getPath(path)
return Files.readAllBytes(pathAsObject)
}
/**
* Checks the existence of a file. returns `true` iff the file exists
* and it is not a directory.
* @param path the path to the file.
* @return `true` iff the file exists and it is not a directory.
* @throws IllegalStateException if not called from within a test.