Commit 55bbe8b4 authored by Jonas Waeber's avatar Jonas Waeber

Refactor xml-transform to deployment

parent 7e21ebb6
Pipeline #14209 failed with stages
in 2 minutes and 46 seconds
......@@ -39,7 +39,7 @@ dependencies {
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.11.0'
implementation 'org.memobase:memobase-service-utilities:0.14.2'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
......
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
name: "{{ .Values.deploymentName }}-config"
namespace: memobase
data:
APPLICATION_ID: "{{ .Values.processId }}-{{ .Values.jobName }}"
XSLT_FILE_PATH: "/configs/mappings/{{ .Values.xsltFileName }}"
RECORD_TAG: "{{ .Values.recordTag }}"
IDENTIFIER_FIELD_NAME: "{{ .Values.identifierFieldName }}"
TOPIC_IN: "{{ .Values.processId }}-{{ .Values.lastJobName }}"
TOPIC_OUT: "{{ .Values.processId }}-{{ .Values.jobName }}"
TOPIC_PROCESS: "{{ .Values.processId }}-reporting"
\ No newline at end of file
APPLICATION_ID: {{ .Values.applicationId }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
apiVersion: batch/v1
kind: Job
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}"
name: {{ .Values.deploymentName }}
namespace: memobase
labels:
institutionId: "{{ .Values.institutionId }}"
recordSetId: "{{ .Values.recordSetId }}"
jobType: "import-job"
jobType: "import-process-deployment"
spec:
selector:
matchLabels:
app: {{ .Values.deploymentName }}
replicas: 1
template:
metadata:
labels:
app: {{ .Values.deploymentName }}
tier: import-process
spec:
restartPolicy: Always
containers:
- name: "{{ .Values.processId }}-{{ .Values.jobName }}"
- name: "{{ .Values.deploymentName }}-container"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
imagePullPolicy: Always
envFrom:
- secretRef:
name: "{{ .Values.sftpConfigs }}"
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
volumeMounts:
- name: config-volume
mountPath: "/configs/mappings/{{ .Values.xsltFileName }}"
subPath: "{{ .Values.xsltFileName }}"
volumes:
- name: config-volume
configMap:
name: "{{ .Values.processId }}-{{ .Values.xsltConfigMapName }}"
restartPolicy: Never
backoffLimit: 0
name: "{{ .Values.deploymentName }}-config"
......@@ -7,25 +7,14 @@ registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/xml-data-transform"
tag: "latest"
deploymentName: xml-data-transform
## TODO: This needs to be solved differently. This way it is not possible to deploy a replica-set.
## somehow the id needs to be dependent on the pod name?
applicationId: xml-data-transform-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
lastJobName: text-file-validation
############################################
## Values below should be defined via the #
## User Interface (Drupal) #
############################################
jobName: xml-data-transform
processId: p0001
institutionId: placeholder
recordSetId: placeholder
xsltFileName: transform.xslt
xsltConfigMapName: xslt-transform-config
recordTag: root
identifierFieldName: placeholderValue
\ No newline at end of file
inputTopicName: import-process-data-transform
outputTopicName: import-process-mapper
reportingTopicName: import-process-reporting
\ No newline at end of file
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.common.header.Headers
import org.apache.kafka.streams.kstream.ValueTransformer
import org.apache.kafka.streams.kstream.ValueTransformerSupplier
import org.apache.kafka.streams.processor.ProcessorContext
class HeaderExtraction : ValueTransformer<Message, TransformMetadata> {
private val extractedHeaders = mutableMapOf<String, String>()
override fun init(context: ProcessorContext?) {
if (context != null) {
extractedHeaders["sessionId"] = extract("sessionId", context.headers())
extractedHeaders["recordSetId"] = extract("recordSetId", context.headers())
extractedHeaders["recordTag"] = extract("xmlRecordTag", context.headers())
extractedHeaders["idTag"] = extract("xmlIdentifierFieldName", context.headers())
}
}
private fun extract(tag: String, headers: Headers): String {
val headerValues = headers.headers(tag)
return headerValues.first().value().toString()
}
override fun transform(value: Message): TransformMetadata {
return TransformMetadata(
value.path,
extractedHeaders["recordTag"] as String,
extractedHeaders["idTag"] as String
)
}
override fun close() {
}
}
class HeaderExtractionSupplier : ValueTransformerSupplier<Message, TransformMetadata> {
override fun get(): ValueTransformer<Message, TransformMetadata> {
return HeaderExtraction()
}
}
\ No newline at end of file
......@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.kafka.streams.kstream.ValueTransformerSupplier
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import java.io.File
......@@ -40,52 +41,19 @@ class KafkaTopology(private val settings: SettingsLoader) {
fun build(): Topology {
val builder = StreamsBuilder()
val errorFilter = builder
val stream = builder
.stream<String, String>(settings.inputTopic)
.flatMapValues { _, value -> parseMessage(value) }
.branch(
Predicate { _, value -> value.format == Formats.error },
Predicate { _, _ -> true }
)
// report filtered error message from previous job.
errorFilter[0]
.mapValues { key, _ ->
Report(
key,
ReportStatus.failure,
ReportMessages.processFailure(key, "The input file is invalid.")
).toJson()
}
.to("${settings.outputTopic}-reporting")
// filtered result simply sends ERROR along!
errorFilter[0]
.mapValues { _ -> "ERROR" }
.to(settings.outputTopic)
// report full process as failure
errorFilter[0]
.mapValues { _ ->
ProcessReport(
"xml-data-transform",
ReportStatus.failure,
1,
0,
1
).toJson()
}
.to(settings.processReportTopic)
val transformedValue = errorFilter[1]
.mapValues { value -> sftpClient.open(File(value.path)) }
.filter { _, value -> value.format != Formats.xml }
.transformValues(HeaderExtractionSupplier())
.mapValues { value -> Pair(value, sftpClient.open(File(value.path))) }
.map { key, value ->
value.use {
xmlTransformer.applyXSLT(key, value.RemoteFileInputStream())
value.second.use {
xmlTransformer.applyXSLT(key, value.first, it)
}
}
streamOutput(transformedValue)
streamOutput(stream)
return builder.build()
}
......
......@@ -26,11 +26,7 @@ class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("TableDataService")
val settings = SettingsLoader(
listOf(
"xsltFilePath",
"identifierFieldName",
"recordTag"
),
listOf(),
file,
useStreamsConfig = true,
readSftpSettings = true
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class TransformMetadata(
val path: String,
val recordTag: String,
val identifierTag: String
)
\ No newline at end of file
......@@ -21,12 +21,8 @@ package org.memobase
import com.beust.klaxon.json
object Formats {
const val csv = "CSV"
const val tsv = "TSV"
const val xlsx = "XLSX"
const val xls = "XLS"
const val ods = "ODS"
const val invalid = "INVALID"
const val xml = "XML"
const val error = "ERROR"
}
......
......@@ -33,14 +33,6 @@ class XMLTransformer(appSettings: Properties) {
private val log = LogManager.getLogger("XMLTransformer")
private val xlstFilePath = appSettings.getProperty("xsltFilePath")
private val identifierFieldName = appSettings.getProperty("identifierFieldName")
private val recordTag = appSettings.getProperty("recordTag")
init {
if (identifierFieldName == "placeholderValue") {
log.error("Requires a value for identifier field name, but found default value.")
}
}
private val processor = Processor(false)
private val xslt = compileXslt()
......@@ -59,15 +51,15 @@ class XMLTransformer(appSettings: Properties) {
}
}
fun applyXSLT(key: String, data: InputStream): KeyValue<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, identifierFieldName, recordTag)
fun applyXSLT(key: String, meta: TransformMetadata, data: InputStream): KeyValue<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, meta.identifierTag, meta.recordTag)
data.use { stream ->
transformer.setSource(StreamSource(stream))
transformer.destination = SAXDestination(contentHandler)
transformer.transform()
}
if (contentHandler.identifier.isEmpty()) {
throw Exception("No valid identifier found in record $key in field $identifierFieldName.")
throw Exception("No valid identifier found in record $key in field ${meta.identifierTag}.")
} else {
return KeyValue(contentHandler.identifier, contentHandler)
}
......
......@@ -3,10 +3,6 @@ sftp:
port: ${SFTP_PORT:?system}
user: ${SFTP_USER:?system}
password: ${SFTP_PASSWORD:?system}
app:
xsltFilePath: ${XSLT_FILE_PATH:?user}
identifierFieldName: ${IDENTIFIER_FIELD_NAME:?user}
recordTag: ${RECORD_TAG:?user}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -108,7 +108,7 @@
<xsl:template match="rights[@typeLabel='Access']/ns2:rights">
<xsl:if test="matches(.,'onsite')">
<xsl:element name="accessPhsyical">
<xsl:element name="accessPhysical">
<xsl:text>onsite</xsl:text>
</xsl:element>
</xsl:if>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment