Commit f558eac0 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update service utility library

Update code style
Add missing license headers
Optimize message parsing
parent 3b91d260
...@@ -38,7 +38,7 @@ dependencies { ...@@ -38,7 +38,7 @@ dependencies {
implementation "org.apache.kafka:kafka-streams:${kafkaV}" implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'ch.memobase:memobase-kafka-utils:0.1.2' implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:1.12.2' implementation 'org.memobase:memobase-service-utilities:2.0.5'
// SFTP Client // SFTP Client
// is needed because of a bug. // is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0' implementation 'com.hierynomus:sshj:0.27.0'
......
...@@ -24,7 +24,8 @@ import org.apache.logging.log4j.LogManager ...@@ -24,7 +24,8 @@ import org.apache.logging.log4j.LogManager
class App { class App {
companion object { companion object {
private val log = LogManager.getLogger("XmlDataApp") private val log = LogManager.getLogger("XmlDataApp")
@JvmStatic fun main(args: Array<String>) { @JvmStatic
fun main(args: Array<String>) {
try { try {
Service().run() Service().run()
} catch (ex: Exception) { } catch (ex: Exception) {
......
...@@ -21,7 +21,12 @@ package org.memobase ...@@ -21,7 +21,12 @@ package org.memobase
import ch.memobase.kafka.utils.ConfigJoiner import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues import ch.memobase.kafka.utils.models.JoinedValues
import com.beust.klaxon.Klaxon import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
import java.io.File
import net.sf.saxon.s9api.SaxonApiException import net.sf.saxon.s9api.SaxonApiException
import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
...@@ -30,16 +35,14 @@ import org.apache.kafka.streams.Topology ...@@ -30,16 +35,14 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.models.* import org.memobase.models.Content
import org.memobase.settings.SettingsLoader import org.memobase.models.Formats
import org.memobase.sftp.SftpClient import org.memobase.models.Input
import org.memobase.models.Message
import org.memobase.utils.MissingIdentifierException import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.SerdeMessage import org.memobase.utils.SerdeMessage
import org.memobase.utils.XsltException import org.memobase.utils.XsltException
import org.memobase.xml.XMLTransformer import org.memobase.xml.XMLTransformer
import settings.HeaderExtractionTransformSupplier
import java.io.File
import java.io.StringReader
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
...@@ -81,8 +84,9 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -81,8 +84,9 @@ class KafkaTopology(private val settings: SettingsLoader) {
.mapValues { key, value -> .mapValues { key, value ->
Report( Report(
key, key,
ReportStatus.failure, ReportStatus.fatal,
"Could not find a matching xslt configuration for record set ${value.second.recordSetId}." "Could not find a matching xslt configuration for record set ${value.second.recordSetId}.",
Service.name
).toJson() ).toJson()
} }
.to(reportingTopic) .to(reportingTopic)
...@@ -115,8 +119,9 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -115,8 +119,9 @@ class KafkaTopology(private val settings: SettingsLoader) {
key, Pair( key, Pair(
null, Report( null, Report(
key, key,
ReportStatus.failure, ReportStatus.fatal,
ex.localizedMessage ex.localizedMessage,
Service.name
) )
) )
) )
...@@ -126,8 +131,9 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -126,8 +131,9 @@ class KafkaTopology(private val settings: SettingsLoader) {
key, Pair( key, Pair(
null, Report( null, Report(
key, key,
ReportStatus.failure, ReportStatus.fatal,
ex.localizedMessage ex.localizedMessage,
Service.name
) )
) )
) )
...@@ -137,8 +143,9 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -137,8 +143,9 @@ class KafkaTopology(private val settings: SettingsLoader) {
key, Pair( key, Pair(
null, Report( null, Report(
key, key,
ReportStatus.failure, ReportStatus.fatal,
ex.localizedMessage ex.localizedMessage,
Service.name
) )
) )
) )
...@@ -161,9 +168,6 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -161,9 +168,6 @@ class KafkaTopology(private val settings: SettingsLoader) {
} }
private fun parseMessage(value: String): List<Message> { private fun parseMessage(value: String): List<Message> {
return Klaxon().parse<Message>(StringReader(value)).let { return Message.fromJson(value)
if (it == null) emptyList()
else listOf(it)
}
} }
} }
...@@ -18,13 +18,17 @@ ...@@ -18,13 +18,17 @@
package org.memobase package org.memobase
import ch.memobase.settings.SettingsLoader
import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") { class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("XmlDataService") private val log = LogManager.getLogger("XmlDataService")
companion object {
const val name = "xml-data-transform"
}
val settings = SettingsLoader( val settings = SettingsLoader(
listOf(), listOf(),
file, file,
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.models package org.memobase.models
import net.sf.saxon.s9api.XsltExecutable import ch.memobase.settings.HeaderMetadata
import settings.HeaderMetadata
import java.io.InputStream import java.io.InputStream
data class Content( data class Content(
......
/* /*
* XML Data Transform Service * xml-data-transform
* Copyright (C) 2020 Memoriav * Copyright (C) 2020 Memoriav
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase.models package org.memobase.models
object Formats { object Formats {
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.models package org.memobase.models
data class Input( data class Input(
......
...@@ -15,10 +15,34 @@ ...@@ -15,10 +15,34 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.memobase.models package org.memobase.models
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.logging.log4j.LogManager
data class Message( data class Message(
val path: String, val path: String,
val format: String val format: String
) ) {
\ No newline at end of file companion object {
private val log = LogManager.getLogger("MessageParser")
private val klaxon = Klaxon()
fun fromJson(data: String): List<Message> {
return try {
klaxon.parse<Message>(data).let {
if (it != null)
listOf(it)
else {
log.error("Parsed message is empty: $data.")
emptyList()
}
}
} catch (ex: KlaxonException) {
log.error(ex.localizedMessage)
emptyList()
}
}
}
}
\ No newline at end of file
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.models
import com.beust.klaxon.Klaxon
import java.time.LocalDateTime
data class Report(
val id: String,
val status: String,
val message: String,
val step: String = "xml-data-transform",
val timestamp: String = LocalDateTime.now().toString()
) {
override fun equals(other: Any?): Boolean {
return when (other) {
null -> false
!is Report -> false
else -> hashCode() == other.hashCode()
}
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + status.hashCode()
result = 31 * result + message.hashCode()
return result
}
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
package org.memobase.models
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
\ No newline at end of file
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.utils package org.memobase.utils
class MissingIdentifierException(key: String, field: String) : class MissingIdentifierException(key: String, field: String) :
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.utils package org.memobase.utils
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import java.io.ByteArrayInputStream
import java.nio.charset.Charset
import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer import org.apache.kafka.common.serialization.Serializer
import org.memobase.models.Message import org.memobase.models.Message
import java.io.ByteArrayInputStream
import java.nio.charset.Charset
class SerdeMessage : Serde<Message> { class SerdeMessage : Serde<Message> {
......
/*
* xml-data-transform
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.utils package org.memobase.utils
import net.sf.saxon.s9api.StaticError import net.sf.saxon.s9api.StaticError
......
...@@ -18,14 +18,15 @@ ...@@ -18,14 +18,15 @@
package org.memobase.xml package org.memobase.xml
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import com.beust.klaxon.JsonObject import com.beust.klaxon.JsonObject
import java.io.StringWriter
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.models.Report import org.memobase.Service
import org.memobase.models.ReportStatus
import org.xml.sax.Attributes import org.xml.sax.Attributes
import org.xml.sax.ContentHandler import org.xml.sax.ContentHandler
import org.xml.sax.Locator import org.xml.sax.Locator
import java.io.StringWriter
/** /**
* Class to transform a xml stream into a json representation. Expects a flat xml preprocessed with * Class to transform a xml stream into a json representation. Expects a flat xml preprocessed with
...@@ -37,7 +38,11 @@ import java.io.StringWriter ...@@ -37,7 +38,11 @@ import java.io.StringWriter
* @param identifierFieldName The field name of the unique identifier of this record. * @param identifierFieldName The field name of the unique identifier of this record.
* @param recordTag The root tag of the xml structure. * @param recordTag The root tag of the xml structure.
*/ */
class SAXContentHandler(private val key: String, private val identifierFieldName: String, private val recordTag: String) : class SAXContentHandler(
private val key: String,
private val identifierFieldName: String,
private val recordTag: String
) :
ContentHandler { ContentHandler {
private val log = LogManager.getLogger("SAXHandler") private val log = LogManager.getLogger("SAXHandler")
...@@ -61,8 +66,9 @@ class SAXContentHandler(private val key: String, private val identifierFieldName ...@@ -61,8 +66,9 @@ class SAXContentHandler(private val key: String, private val identifierFieldName
it it
?: Report( ?: Report(
identifier ?: key, identifier ?: key,
ReportStatus.failure, ReportStatus.fatal,
"Unknown Failure: No report found." "Unknown Failure: No report found.",
Service.name
) )
} }
} }
...@@ -183,8 +189,9 @@ class SAXContentHandler(private val key: String, private val identifierFieldName ...@@ -183,8 +189,9 @@ class SAXContentHandler(private val key: String, private val identifierFieldName
output.write(jsonResult.toJsonString()) output.write(jsonResult.toJsonString())
report = Report( report = Report(
id = identifier ?: key, id = identifier ?: key,
status = if (reportText.isEmpty()) ReportStatus.success else ReportStatus.failure, status = if (reportText.isEmpty()) ReportStatus.success else ReportStatus.fatal,
message = if (reportText.isEmpty()) "Successfully transformed xml to json!" else reportText.trim() message = if (reportText.isEmpty()) "Successfully transformed xml to json!" else reportText.trim(),
step = Service.name
) )
} }
} }
\ No newline at end of file
...@@ -17,18 +17,15 @@ ...@@ -17,18 +17,15 @@
*/ */
package org.memobase.xml package org.memobase.xml
import ch.memobase.settings.HeaderMetadata
import java.io.ByteArrayInputStream
import java.io.InputStream
import javax.xml.transform.stream.StreamSource
import net.sf.saxon.s9api.Processor import net.sf.saxon.s9api.Processor
import net.sf.saxon.s9api.SAXDestination import net.sf.saxon.s9api.SAXDestination
import net.sf.saxon.s9api.StaticError import net.sf.saxon.s9api.StaticError
import net.sf.saxon.s9api.XsltExecutable
import org.apache.kafka.streams.KeyValue
import org.apache.logging.log4j.LogManager
import org.memobase.utils.MissingIdentifierException import org.memobase.utils.MissingIdentifierException
import org.memobase.utils.XsltException import org.memobase.utils.XsltException
import settings.HeaderMetadata
import java.io.ByteArrayInputStream
import java.io.InputStream
import javax.xml.transform.stream.StreamSource
class XMLTransformer { class XMLTransformer {
private val processor = Processor(false) private val processor = Processor(false)
...@@ -37,7 +34,12 @@ class XMLTransformer { ...@@ -37,7 +34,12 @@ class XMLTransformer {
return xsltData return xsltData
} }
fun applyXSLT(key: String, headers: HeaderMetadata, data: InputStream, xsltFile: ByteArray): Pair<String, SAXContentHandler> { fun applyXSLT(
key: String,
headers: HeaderMetadata,
data: InputStream,
xsltFile: ByteArray
): Pair<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, headers.xmlIdentifierFieldName, headers.xmlRecordTag) val contentHandler = SAXContentHandler(key, headers.xmlIdentifierFieldName, headers.xmlRecordTag)
val errorList = mutableListOf<StaticError>() val errorList = mutableListOf<StaticError>()
val xsltCompiler = processor.newXsltCompiler() val xsltCompiler = processor.newXsltCompiler()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment