Commit 41d7f0b2 authored by Jonas Waeber's avatar Jonas Waeber

Fix sftp client issues

parent 9d8eb98f
Pipeline #12329 passed with stages
in 4 minutes and 40 seconds
......@@ -79,7 +79,11 @@ class KafkaTopology(private val settings: SettingsLoader) {
val transformedValue = errorFilter[1]
.mapValues { value -> sftpClient.open(File(value.path)) }
.map { key, value -> xmlTransformer.applyXSLT(key, value) }
.map { key, value ->
value.use {
xmlTransformer.applyXSLT(key, value.RemoteFileInputStream())
}
}
streamOutput(transformedValue)
......
......@@ -17,7 +17,6 @@
*/
package org.memobase
import net.schmizz.sshj.sftp.RemoteFile
import net.sf.saxon.s9api.Processor
import net.sf.saxon.s9api.SAXDestination
import net.sf.saxon.s9api.StaticError
......@@ -60,15 +59,13 @@ class XMLTransformer(appSettings: Properties) {
}
}
fun applyXSLT(key: String, data: RemoteFile): KeyValue<String, SAXContentHandler> {
fun applyXSLT(key: String, data: InputStream): KeyValue<String, SAXContentHandler> {
val contentHandler = SAXContentHandler(key, identifierFieldName, recordTag)
data.use { file ->
file.RemoteFileInputStream().use {
transformer.setSource(StreamSource(it))
data.use { stream ->
transformer.setSource(StreamSource(stream))
transformer.destination = SAXDestination(contentHandler)
transformer.transform()
}
}
if (contentHandler.identifier.isEmpty()) {
throw Exception("No valid identifier found in record $key in field $identifierFieldName.")
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment