Commit 52b1748a authored by Jonas Waeber's avatar Jonas Waeber

Reworked implementation

parent 55bbe8b4
......@@ -2,12 +2,11 @@ plugins {
id 'application'
id 'distribution'
id 'org.jetbrains.kotlin.jvm' version '1.3.71'
id 'com.palantir.git-version' version '0.11.0'
id "com.gitlab.morality.grit" version "2.0.2"
id 'org.jlleitschuh.gradle.ktlint' version '9.2.1'
}
group 'org.memobase'
version = gitVersion()
mainClassName = 'org.memobase.App'
jar {
......@@ -38,8 +37,8 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:0.14.2'
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:0.16.0'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
......
......@@ -8,8 +8,6 @@ image: "memoriav/memobase-2020/services/import-process/xml-data-transform"
tag: "latest"
deploymentName: xml-data-transform
## TODO: This needs to be solved differently. This way it is not possible to deploy a replica-set.
## somehow the id needs to be dependent on the pod name?
applicationId: xml-data-transform-app
kafkaConfigs: prod-kafka-bootstrap-servers
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.common.header.Headers
import org.apache.kafka.streams.kstream.ValueTransformer
import org.apache.kafka.streams.kstream.ValueTransformerSupplier
import org.apache.kafka.streams.processor.ProcessorContext
class HeaderExtraction : ValueTransformer<Message, TransformMetadata> {
private val extractedHeaders = mutableMapOf<String, String>()
override fun init(context: ProcessorContext?) {
if (context != null) {
extractedHeaders["sessionId"] = extract("sessionId", context.headers())
extractedHeaders["recordSetId"] = extract("recordSetId", context.headers())
extractedHeaders["recordTag"] = extract("xmlRecordTag", context.headers())
extractedHeaders["idTag"] = extract("xmlIdentifierFieldName", context.headers())
}
}
private fun extract(tag: String, headers: Headers): String {
val headerValues = headers.headers(tag)
return headerValues.first().value().toString()
}
override fun transform(value: Message): TransformMetadata {
return TransformMetadata(
value.path,
extractedHeaders["recordTag"] as String,
extractedHeaders["idTag"] as String
)
}
override fun close() {
}
}
class HeaderExtractionSupplier : ValueTransformerSupplier<Message, TransformMetadata> {
override fun get(): ValueTransformer<Message, TransformMetadata> {
return HeaderExtraction()
}
}
\ No newline at end of file
......@@ -18,78 +18,112 @@
package org.memobase
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import com.beust.klaxon.Klaxon
import net.schmizz.sshj.sftp.RemoteFile
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.kafka.streams.kstream.ValueTransformerSupplier
import org.apache.logging.log4j.LogManager
import org.memobase.models.*
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.SerdeMessage
import org.memobase.utils.XsltException
import org.memobase.xml.XMLTransformer
import settings.HeaderExtractionTransformSupplier
import java.io.File
import java.io.InputStream
import java.io.StringReader
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("XMLDataImport")
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val xmlTransformer = XMLTransformer(settings.appSettings)
private val reportingTopic = settings.outputTopic + "-reporting"
private val xmlTransformer = XMLTransformer()
private val configJoiner = ConfigJoiner<Message, ByteArray>(
ImportService.Transform,
SerdeMessage(),
Serdes.ByteArray(),
xmlTransformer::xsltFunction
)
private val reportingTopic = settings.processReportTopic
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder
val configStream = builder.stream<ByteArray, ByteArray>("import-process-config")
val dataStream = builder
.stream<String, String>(settings.inputTopic)
.flatMapValues { _, value -> parseMessage(value) }
.filter { _, value -> value.format != Formats.xml }
.transformValues(HeaderExtractionSupplier())
.mapValues { value -> Pair(value, sftpClient.open(File(value.path))) }
.map { key, value ->
value.second.use {
xmlTransformer.applyXSLT(key, value.first, it)
}
val joinedStream = configJoiner.join(dataStream, configStream)
val saxHandlerStream = joinedStream
.mapValues { value -> Input(value.left, value.right) }
.transformValues(HeaderExtractionTransformSupplier<Input>())
.mapValues { value ->
Content(
value.first.message,
value.second,
value.first.xsltData,
sftpClient.open(File(value.first.message.path))
)
}
.map { key, value ->
value.inputStream.use {
try {
val output =
xmlTransformer.applyXSLT(key, value.headerMetadata, value.inputStream, value.xsltData)
KeyValue(
output.first,
Pair(output.second.output.toString(), output.second.getReport())
)
} catch (ex: XsltException) {
log.error(ex.message)
KeyValue(
key, Pair(
null, Report(
key,
ReportStatus.failure,
ex.localizedMessage
)
)
)
streamOutput(stream)
} catch (ex: MissingIdentifierException) {
log.error(ex.message)
KeyValue(
key, Pair(
null, Report(
key,
ReportStatus.failure,
ex.localizedMessage
)
)
)
}
}
}
streamOutput(saxHandlerStream)
return builder.build()
}
private fun streamOutput(stream: KStream<String, SAXContentHandler>) {
private fun streamOutput(stream: KStream<String, Pair<String?, Report>>) {
stream
.mapValues { value -> value.output.toString() }
.filter { _, value -> value != null }
.mapValues { value -> value.first }
.to(settings.outputTopic)
stream
.mapValues { value -> value.getReport().toJson() }
.mapValues { value -> value.second.toJson() }
.to(reportingTopic)
stream
.mapValues { value ->
val report = value.getReport()
if (report.status == ReportStatus.success) {
ProcessReport(
"xml-data-transform",
ReportStatus.success,
1,
1,
0
)
} else {
ProcessReport(
"xml-data-transform",
ReportStatus.failure,
1,
0,
1
)
}
}
.mapValues { value -> value.toJson() }
.to(settings.processReportTopic)
}
private fun parseMessage(value: String): List<Message> {
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
data class ProcessReport(
val id: String,
val status: String,
val total: Int,
val successes: Int,
val failures: Int
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
/*
* XML Data Transform Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.json
object Formats {
const val invalid = "INVALID"
const val xml = "XML"
const val error = "ERROR"
}
object Extensions {
const val csv = "csv"
const val tsv = "tsv"
const val xlsx = "xlsx"
const val xls = "xls"
const val ods = "ods"
}
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
object ErrorResult {
val result = json {
obj(Pair("message", Formats.error))
}
}
object ReportMessages {
fun processFailure(fileName: String, message: String): String {
return "Could not process file $fileName, because $message"
}
fun processSuccess(count: Int): String {
return "Transformed table data into $count records."
}
fun invalidFile(fileName: String, message: String): String {
return "Invalid Input Error: $message for file $fileName."
}
fun reportSuccess(identifier: String, count: Int): String {
return "Successfully transformed row $count into key-value map with identifier $identifier."
}
fun reportFailure(message: String): String {
return "Invalid Input Error: $message"
}
}
package org.memobase.models
import net.sf.saxon.s9api.XsltExecutable
import settings.HeaderMetadata
import java.io.InputStream
data class Content(
val message: Message,
val headerMetadata: HeaderMetadata,
val xsltData: ByteArray,
val inputStream: InputStream
)
\ No newline at end of file
/*
* xml-data-transform
* XML Data Transform Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
......@@ -16,10 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.models
data class TransformMetadata(
val path: String,
val recordTag: String,
val identifierTag: String
)
\ No newline at end of file
object Formats {
const val xml = "XML"
}
package org.memobase.models
data class Input(
val message: Message,
val xsltData: ByteArray
)
......@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.models
data class Message(
val path: String,
......
......@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.models
import com.beust.klaxon.Klaxon
......
package org.memobase.models
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
\ No newline at end of file
package org.memobase.utils
class MissingIdentifierException(key: String, field: String) :
Exception("Could not extract an identifier from resource $key in field $field.")
package org.memobase.utils
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer
import org.memobase.models.Message
import java.io.ByteArrayInputStream
import java.nio.charset.Charset
class SerdeMessage : Serde<Message> {
private val klaxon = Klaxon()
override fun serializer(): Serializer<Message> {
return Serializer { _, data ->
klaxon.toJsonString(data).toByteArray()
}
}
override fun deserializer(): Deserializer<Message> {
return Deserializer { _, data ->
klaxon.parse<Message>(ByteArrayInputStream(data)) ?: error(
"Could not deserialize message: ${
data.toString(
Charset.defaultCharset()
)
}"
)
}
}
}
\ No newline at end of file
package org.memobase.utils
import net.sf.saxon.s9api.StaticError
class XsltException(errorList: List<StaticError>) :
Exception("Found error while parsing XSLT: " + errorList.joinToString(separator = "; ") { it.message })
\ No newline at end of file
......@@ -16,24 +16,46 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.xml
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.models.Report
import org.memobase.models.ReportStatus
import org.xml.sax.Attributes
import org.xml.sax.ContentHandler
import org.xml.sax.Locator
import java.io.StringWriter
/**
* Class to transform a xml stream into a json representation. Expects a flat xml preprocessed with
* a xslt if necessary.
*
* Can only handle elements up to one level deep and ignores attributes.
*
* @param key The key of the kafka message.
* @param identifierFieldName The field name of the unique identifier of this record.
* @param recordTag The root tag of the xml structure.
*/
class SAXContentHandler(key: String, private val identifierFieldName: String, private val recordTag: String) :
ContentHandler {
private val log = LogManager.getLogger("SAXHandler")
/**
* The json representation of the xml stream after processing.
*/
val output = StringWriter()
/**
* The identifier is used as a message key for the outgoing message.
*/
var identifier: String = key
private var report: Report? = null
private val jsonResult = JsonObject()
/**
* @return A report on the status of the transformation.
*/
fun getReport(): Report {
return report.let {
it
......
......@@ -15,7 +15,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.xml
import net.sf.saxon.s9api.Processor
import net.sf.saxon.s9api.SAXDestination
......@@ -23,45 +23,40 @@ import net.sf.saxon.s9api.StaticError
import net.sf.saxon.s9api.XsltExecutable
import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager
import java.io.File
import java.io.FileInputStream
import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.XsltException
import settings.HeaderMetadata
import java.io.ByteArrayInputStream
import java.io.InputStream
import java.util.Properties
import javax.xml.transform.stream.StreamSource
class XMLTransformer(appSettings: Properties) {
private val log = LogManager.getLogger("XMLTransformer")
private val xlstFilePath = appSettings.getProperty("xsltFilePath")
class XMLTransformer {
private val processor = Processor(false)
private val xslt = compileXslt()
private val transformer = xslt.load()
private fun compileXslt(): XsltExecutable {
fun xsltFunction(xsltData: ByteArray): ByteArray {
return xsltData
}
fun applyXSLT(key: String, headers: HeaderMetadata, data: InputStream, xsltFile: ByteArray): Pair<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, headers.xmlIdentifierFieldName, headers.xmlRecordTag)
val errorList = mutableListOf<StaticError>()
val xsltCompiler = processor.newXsltCompiler()
xsltCompiler.setErrorList(errorList)
val source = StreamSource(FileInputStream(File(xlstFilePath)))
val source = StreamSource(ByteArrayInputStream(xsltFile))
val executable = xsltCompiler.compile(source)
if (errorList.isEmpty()) {
return executable
} else {
throw Exception(errorList.joinToString())
if (errorList.isNotEmpty()) {
throw XsltException(errorList)
}
}
fun applyXSLT(key: String, meta: TransformMetadata, data: InputStream): KeyValue<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, meta.identifierTag, meta.recordTag)
val transformer = executable.load()
data.use { stream ->
transformer.setSource(StreamSource(stream))
transformer.destination = SAXDestination(contentHandler)
transformer.transform()
}
if (contentHandler.identifier.isEmpty()) {
throw Exception("No valid identifier found in record $key in field ${meta.identifierTag}.")
throw MissingIdentifierException(key, headers.xmlIdentifierFieldName)
} else {
return KeyValue(contentHandler.identifier, contentHandler)
return Pair(contentHandler.identifier, contentHandler)
}
}
}
\ No newline at end of file
......@@ -19,6 +19,9 @@
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
......@@ -29,11 +32,12 @@ import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.models.Report
import org.memobase.models.ReportStatus
import org.memobase.testing.EmbeddedSftpServer
import java.io.File
import java.io.FileInputStream
import java.nio.charset.Charset
import java.nio.file.Paths
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
......@@ -55,6 +59,17 @@ class TestIntegration {
FileInputStream("$resourcePath/${params.count}/data.xml")
)
val headers = RecordHeaders()
headers.add(RecordHeader("sessionId", "test-session-id".toByteArray()))
headers.add(RecordHeader("recordSetId", "test-record-set-id".toByteArray()))
headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray()))
headers.add(RecordHeader("isPublished", "false".toByteArray()))
headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
headers.add(RecordHeader("xmlIdentifierFieldName", "id".toByteArray()))
headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))
val service = Service("test${params.count}.yml")