Commit 2e4ba945 authored by Jonas Waeber's avatar Jonas Waeber

Merge branch 'deployment-transform'

# Conflicts:
#	build.gradle
#	helm-charts/values.yaml
#	src/main/kotlin/KafkaTopology.kt
#	src/test/kotlin/TestIntegration.kt
parents 83a3c279 0f771654
......@@ -2,12 +2,11 @@ plugins {
id 'application'
id 'distribution'
id 'org.jetbrains.kotlin.jvm' version '1.3.71'
id 'com.palantir.git-version' version '0.11.0'
id "com.gitlab.morality.grit" version "2.0.2"
id 'org.jlleitschuh.gradle.ktlint' version '9.2.1'
}
group 'org.memobase'
version = gitVersion()
mainClassName = 'org.memobase.App'
jar {
......@@ -38,9 +37,8 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.12.1'
implementation 'ch.memobase:import-process-effects-registry_2.12:0.2.1'
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:0.16.0'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
......
############################################
## Values in this section are the same for #
## all jobs #
############################################
#image values
registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/xml-data-transform"
tag: "latest"
deploymentName: xml-data-transform-deployment
applicationId: xml-data-transform-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
inputTopicName: import-process-data-transform
outputTopicName: import-process-mapper
reportingTopicName: import-process-reporting
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
name: "{{ .Values.deploymentName }}-config"
namespace: memobase
data:
APPLICATION_ID: "{{ .Values.processId }}-{{ .Values.jobName }}"
XSLT_FILE_PATH: "/configs/mappings/{{ .Values.xsltFileName }}"
RECORD_TAG: "{{ .Values.recordTag }}"
IDENTIFIER_FIELD_NAME: "{{ .Values.identifierFieldName }}"
TOPIC_IN: "{{ .Values.processId }}-{{ .Values.lastJobName }}"
TOPIC_OUT: "{{ .Values.processId }}-{{ .Values.jobName }}"
TOPIC_PROCESS: "{{ .Values.processId }}-reporting"
\ No newline at end of file
APPLICATION_ID: {{ .Values.applicationId }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
apiVersion: batch/v1
kind: Job
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}"
name: {{ .Values.deploymentName }}
namespace: memobase
labels:
institutionId: "{{ .Values.institutionId }}"
recordSetId: "{{ .Values.recordSetId }}"
jobType: "import-job"
jobType: "import-process-deployment"
spec:
selector:
matchLabels:
app: {{ .Values.deploymentName }}
replicas: 1
template:
metadata:
labels:
app: {{ .Values.deploymentName }}
tier: import-process
spec:
restartPolicy: Always
containers:
- name: "{{ .Values.processId }}-{{ .Values.jobName }}"
- name: "{{ .Values.deploymentName }}-container"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
imagePullPolicy: Always
envFrom:
- secretRef:
name: "{{ .Values.sftpConfigs }}"
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
volumeMounts:
- name: config-volume
mountPath: "/configs/mappings/{{ .Values.xsltFileName }}"
subPath: "{{ .Values.xsltFileName }}"
volumes:
- name: config-volume
configMap:
name: "{{ .Values.processId }}-{{ .Values.xsltConfigMapName }}"
restartPolicy: Never
backoffLimit: 0
name: "{{ .Values.deploymentName }}-config"
......@@ -7,26 +7,12 @@ registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/xml-data-transform"
tag: "latest"
deploymentName: xml-data-transform
applicationId: xml-data-transform-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
lastJobName: text-file-validation
xsltConfigMapName: xslt-transform-config
############################################
## Values below should be defined via the #
## User Interface (Drupal) #
############################################
jobName: xml-data-transform
processId: p0001
institutionId: placeholder
recordSetId: placeholder
xsltFileName: transform.xslt
recordTag: root
identifierFieldName: placeholderValue
\ No newline at end of file
inputTopicName: import-process-data-transform
outputTopicName: import-process-mapper
reportingTopicName: import-process-reporting
\ No newline at end of file
......@@ -18,108 +18,117 @@
package org.memobase
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import com.beust.klaxon.Klaxon
import net.schmizz.sshj.sftp.RemoteFile
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.models.*
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.SerdeMessage
import org.memobase.utils.XsltException
import org.memobase.xml.XMLTransformer
import settings.HeaderExtractionTransformSupplier
import java.io.File
import java.io.InputStream
import java.io.StringReader
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("XMLDataImport")
private val sftpClient: SftpClient = SftpClient(settings.sftpSettings)
private val xmlTransformer = XMLTransformer(settings.appSettings)
private val reportingTopic = settings.outputTopic + "-reporting"
private val xmlTransformer = XMLTransformer()
private val configJoiner = ConfigJoiner<Message, ByteArray>(
ImportService.Transform,
SerdeMessage(),
Serdes.ByteArray(),
xmlTransformer::xsltFunction
)
private val reportingTopic = settings.processReportTopic
fun prepare(): StreamsBuilder {
fun build(): Topology {
val builder = StreamsBuilder()
val errorFilter = builder
val configStream = builder.stream<String, String>("import-process-config")
.map { key, value -> KeyValue(key.toByteArray(), value.toByteArray()) }
val dataStream = builder
.stream<String, String>(settings.inputTopic)
.flatMapValues { _, value -> parseMessage(value) }
.branch(
Predicate { _, value -> value.format == Formats.error },
Predicate { _, _ -> true }
)
// report filtered error message from previous job.
errorFilter[0]
.mapValues { key, _ ->
Report(
key,
ReportStatus.failure,
ReportMessages.processFailure(key, "The input file is invalid.")
).toJson()
.filter { _, value ->
value.format == Formats.xml
}
.to("${settings.outputTopic}-reporting")
// filtered result simply sends ERROR along!
errorFilter[0]
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
val joinedStream = configJoiner.join(dataStream, configStream)
// report full process as failure
errorFilter[0]
.mapValues { _ ->
ProcessReport(
"xml-data-transform",
ReportStatus.failure,
1,
0,
1
).toJson()
val saxHandlerStream = joinedStream
.mapValues { value ->
log.debug("Combine joined inputs.")
Input(value.left, value.right) }
.transformValues(HeaderExtractionTransformSupplier<Input>())
.mapValues { value ->
Content(
value.first.message,
value.second,
value.first.xsltData,
sftpClient.open(File(value.first.message.path))
)
}
.to(settings.processReportTopic)
val transformedValue = errorFilter[1]
.mapValues { value -> sftpClient.open(File(value.path)) }
.map { key, value ->
value.use {
xmlTransformer.applyXSLT(key, value)
value.inputStream.use {
try {
val output =
xmlTransformer.applyXSLT(key, value.headerMetadata, value.inputStream, value.xsltData)
KeyValue(
output.first,
Pair(output.second.output.toString(), output.second.getReport())
)
} catch (ex: XsltException) {
log.error(ex.message)
KeyValue(
key, Pair(
null, Report(
key,
ReportStatus.failure,
ex.localizedMessage
)
)
)
} catch (ex: MissingIdentifierException) {
log.error(ex.message)
KeyValue(
key, Pair(
null, Report(
key,
ReportStatus.failure,
ex.localizedMessage
)
)
)
}
}
}
streamOutput(transformedValue)
return builder
streamOutput(saxHandlerStream)
return builder.build()
}
private fun streamOutput(stream: KStream<String, SAXContentHandler>) {
private fun streamOutput(stream: KStream<String, Pair<String?, Report>>) {
stream
.mapValues { value -> value.output.toString() }
.filter { _, value -> value != null }
.mapValues { value -> value.first }
.to(settings.outputTopic)
stream
.mapValues { value -> value.getReport().toJson() }
.mapValues { value -> value.second.toJson() }
.to(reportingTopic)
stream
.mapValues { value ->
val report = value.getReport()
if (report.status == ReportStatus.success) {
ProcessReport(
"xml-data-transform",
ReportStatus.success,
1,
1,
0
)
} else {
ProcessReport(
"xml-data-transform",
ReportStatus.failure,
1,
0,
1
)
}
}
.mapValues { value -> value.toJson() }
.to(settings.processReportTopic)
}
private fun parseMessage(value: String): List<Message> {
......
......@@ -32,11 +32,7 @@ class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
val settings = SettingsLoader(
listOf(
"xsltFilePath",
"identifierFieldName",
"recordTag"
),
listOf(),
file,
useStreamsConfig = true,
readSftpSettings = true
......
/*
* XML Data Transform Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.json
object Formats {
const val csv = "CSV"
const val tsv = "TSV"
const val xlsx = "XLSX"
const val xls = "XLS"
const val ods = "ODS"
const val invalid = "INVALID"
const val error = "ERROR"
}
object Extensions {
const val csv = "csv"
const val tsv = "tsv"
const val xlsx = "xlsx"
const val xls = "xls"
const val ods = "ods"
}
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
object ErrorResult {
val result = json {
obj(Pair("message", Formats.error))
}
}
object ReportMessages {
fun processFailure(fileName: String, message: String): String {
return "Could not process file $fileName, because $message"
}
fun processSuccess(count: Int): String {
return "Transformed table data into $count records."
}
fun invalidFile(fileName: String, message: String): String {
return "Invalid Input Error: $message for file $fileName."
}
fun reportSuccess(identifier: String, count: Int): String {
return "Successfully transformed row $count into key-value map with identifier $identifier."
}
fun reportFailure(message: String): String {
return "Invalid Input Error: $message"
}
}
package org.memobase.models
import net.sf.saxon.s9api.XsltExecutable
import settings.HeaderMetadata
import java.io.InputStream
data class Content(
val message: Message,
val headerMetadata: HeaderMetadata,
val xsltData: ByteArray,
val inputStream: InputStream
)
\ No newline at end of file
/*
* xml-data-transform
* XML Data Transform Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
......@@ -16,18 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.models
import com.beust.klaxon.Klaxon
data class ProcessReport(
val id: String,
val status: String,
val total: Int,
val successes: Int,
val failures: Int
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
object Formats {
const val xml = "XML"
}
package org.memobase.models
data class Input(
val message: Message,
val xsltData: ByteArray
)
......@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.models
data class Message(
val path: String,
......
......@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.models
import com.beust.klaxon.Klaxon
......
package org.memobase.models
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
\ No newline at end of file
package org.memobase.utils
class MissingIdentifierException(key: String, field: String) :
Exception("Could not extract an identifier from resource $key in field $field.")
package org.memobase.utils
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer
import org.memobase.models.Message
import java.io.ByteArrayInputStream
import java.nio.charset.Charset
class SerdeMessage : Serde<Message> {
private val klaxon = Klaxon()
override fun serializer(): Serializer<Message> {
return Serializer { _, data ->
klaxon.toJsonString(data).toByteArray()
}
}
override fun deserializer(): Deserializer<Message> {
return Deserializer { _, data ->
klaxon.parse<Message>(ByteArrayInputStream(data)) ?: error(
"Could not deserialize message: ${
data.toString(
Charset.defaultCharset()
)
}"
)
}
}
}
\ No newline at end of file
package org.memobase.utils
import net.sf.saxon.s9api.StaticError
class XsltException(errorList: List<StaticError>) :
Exception("Found error while parsing XSLT: " + errorList.joinToString(separator = "; ") { it.message })
\ No newline at end of file
......@@ -16,29 +16,51 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.xml
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.models.Report
import org.memobase.models.ReportStatus
import org.xml.sax.Attributes
import org.xml.sax.ContentHandler
import org.xml.sax.Locator
import java.io.StringWriter
class SAXContentHandler(key: String, private val identifierFieldName: String, private val recordTag: String) :
/**
* Class to transform a xml stream into a json representation. Expects a flat xml preprocessed with
* a xslt if necessary.
*
* Can only handle elements up to one level deep and ignores attributes.
*
* @param key The key of the kafka message.
* @param identifierFieldName The field name of the unique identifier of this record.
* @param recordTag The root tag of the xml structure.
*/
class SAXContentHandler(private val key: String, private val identifierFieldName: String, private val recordTag: String) :
ContentHandler {
private val log = LogManager.getLogger("SAXHandler")
/**
* The json representation of the xml stream after processing.
*/
val output = StringWriter()
var identifier: String = key
/**
* The identifier is used as a message key for the outgoing message.
*/
var identifier: String? = null
private var report: Report? = null
private val jsonResult = JsonObject()
/**
* @return A report on the status of the transformation.
*/
fun getReport(): Report {
return report.let {
it
?: Report(
identifier,
identifier ?: key,
ReportStatus.failure,
"Unknown Failure: No report found."
)
......@@ -160,7 +182,7 @@ class SAXContentHandler(key: String, private val identifierFieldName: String, pr
override fun endDocument() {
output.write(jsonResult.toJsonString())
report = Report(
id = identifier,
id = identifier ?: key,
status = if (reportText.isEmpty()) ReportStatus.success else ReportStatus.failure,
message = if (reportText.isEmpty()) "Successfully transformed xml to json!" else reportText.trim()
)
......
......@@ -15,7 +15,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
package org.memobase.xml
import net.sf.saxon.s9api.Processor
import net.sf.saxon.s9api.SAXDestination
......@@ -23,53 +23,40 @@ import net.sf.saxon.s9api.StaticError
import net.sf.saxon.s9api.XsltExecutable
import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager
import java.io.File
import java.io.FileInputStream
import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.XsltException
import settings.HeaderMetadata
import java.io.ByteArrayInputStream
import java.io.InputStream