Commit 9d8eb98f authored by Jonas Waeber's avatar Jonas Waeber

Fix sftp client

parent 01d2a55f
Pipeline #12327 failed with stages
in 1 minute and 22 seconds
......@@ -39,7 +39,10 @@ dependencies {
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation 'org.memobase:memobase-service-utilities:1.7.1'
implementation 'org.memobase:memobase-service-utilities:1.11.0'
// SFTP Client
// is needed because of a bug.
implementation 'com.hierynomus:sshj:0.27.0'
// https://mvnrepository.com/artifact/net.sf.saxon/Saxon-HE
compile group: 'net.sf.saxon', name: 'Saxon-HE', version: '9.9.1-7'
......
......@@ -19,6 +19,7 @@
package org.memobase
import com.beust.klaxon.Klaxon
import net.schmizz.sshj.sftp.RemoteFile
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
......@@ -78,7 +79,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
val transformedValue = errorFilter[1]
.mapValues { value -> sftpClient.open(File(value.path)) }
.map { key, value -> transformXml(key, value) }
.map { key, value -> xmlTransformer.applyXSLT(key, value) }
streamOutput(transformedValue)
......@@ -125,10 +126,4 @@ class KafkaTopology(private val settings: SettingsLoader) {
else listOf(it)
}
}
private fun transformXml(key: String, data: InputStream): KeyValue<String, SAXContentHandler> {
return xmlTransformer.applyXSLT(key, data)
}
}
......@@ -17,6 +17,7 @@
*/
package org.memobase
import net.schmizz.sshj.sftp.RemoteFile
import net.sf.saxon.s9api.Processor
import net.sf.saxon.s9api.SAXDestination
import net.sf.saxon.s9api.StaticError
......@@ -59,12 +60,14 @@ class XMLTransformer(appSettings: Properties) {
}
}
fun applyXSLT(key: String, data: InputStream): KeyValue<String, SAXContentHandler> {
fun applyXSLT(key: String, data: RemoteFile): KeyValue<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, identifierFieldName, recordTag)
data.use {
transformer.setSource(StreamSource(it))
transformer.destination = SAXDestination(contentHandler)
transformer.transform()
data.use { file ->
file.RemoteFileInputStream().use {
transformer.setSource(StreamSource(it))
transformer.destination = SAXDestination(contentHandler)
transformer.transform()
}
}
if (contentHandler.identifier.isEmpty()) {
throw Exception("No valid identifier found in record $key in field $identifierFieldName.")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment