Commit ce901fb7 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor producer and sftp client as closable

This ensures that these resources are properly closed,
even with an exception.
parent eb5fff02
...@@ -28,22 +28,22 @@ class App { ...@@ -28,22 +28,22 @@ class App {
@JvmStatic fun main(args: Array<String>) { @JvmStatic fun main(args: Array<String>) {
try { try {
val settings = SettingsLoader() val settings = SettingsLoader()
val producer = Producer(settings.kafkaProducerProperties, settings.topic) Producer(settings.kafkaProducerProperties, settings.topic).use { producer ->
val sftp = SftpClient(settings.sftpSettings) SftpClient(settings.sftpSettings).use { sftp ->
val validator = FileValidation(sftp) val validator = FileValidation(sftp)
val files = sftp.listFiles(settings.appSettings["directory"].orEmpty()).map { File(it) } val files = sftp.listFiles(settings.appSettings["directory"].orEmpty()).map { File(it) }
for (file in files) { for (file in files) {
val report = validator.validate(file) val report = validator.validate(file)
when (report.status) { when (report.status) {
"valid" -> producer.sendMessage(file.name, file.path, report.format) "valid" -> producer.sendMessage(file.name, file.path, report.format)
else -> producer.sendMessage(file.name, "ERROR", report.format) else -> producer.sendMessage(file.name, "ERROR", report.format)
}
producer.sendReport(report)
}
} }
producer.sendReport(report)
} }
producer.close()
sftp.close()
} catch (ex: Exception) { } catch (ex: Exception) {
ex.printStackTrace() ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message) log.error("Stopping application due to error: " + ex.message)
......
...@@ -19,11 +19,12 @@ package org.memobase ...@@ -19,11 +19,12 @@ package org.memobase
import com.beust.klaxon.JsonObject import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import java.io.Closeable
import java.util.Properties import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerRecord
class Producer(props: Properties, private val topic: String) { class Producer(props: Properties, private val topic: String) : Closeable {
private val instance = KafkaProducer<String, String>(props) private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting" private val reportingTopic = "$topic-reporting"
...@@ -36,7 +37,7 @@ class Producer(props: Properties, private val topic: String) { ...@@ -36,7 +37,7 @@ class Producer(props: Properties, private val topic: String) {
instance.send(ProducerRecord(reportingTopic, report.id, Klaxon().toJsonString(report))) instance.send(ProducerRecord(reportingTopic, report.id, Klaxon().toJsonString(report)))
} }
fun close() { override fun close() {
instance.flush() instance.flush()
instance.close() instance.close()
} }
......
...@@ -18,7 +18,10 @@ ...@@ -18,7 +18,10 @@
package org.memobase package org.memobase
import java.io.Closeable
import java.io.File import java.io.File
import java.net.ConnectException
import java.net.UnknownHostException
import kotlin.system.exitProcess import kotlin.system.exitProcess
import net.schmizz.sshj.SSHClient import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.FileAttributes import net.schmizz.sshj.sftp.FileAttributes
...@@ -29,23 +32,29 @@ import net.schmizz.sshj.sftp.SFTPClient ...@@ -29,23 +32,29 @@ import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.userauth.UserAuthException import net.schmizz.sshj.userauth.UserAuthException
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
class SftpClient(sftpSettings: Map<String, String>) { class SftpClient(sftpSettings: Map<String, String>) : Closeable {
private val log = LogManager.getLogger("SftpClient") private val log = LogManager.getLogger("SftpClient")
private val ssh = SSHClient() private val ssh = SSHClient()
private val instance: SFTPClient private val instance: SFTPClient
init { init {
ssh.loadKnownHosts()
ssh.addHostKeyVerifier(sftpSettings["fingerprint"])
ssh.connect(sftpSettings["host"])
try { try {
ssh.loadKnownHosts()
ssh.addHostKeyVerifier(sftpSettings["fingerprint"])
ssh.connect(sftpSettings["host"])
ssh.authPassword(sftpSettings["user"], sftpSettings["password"]) ssh.authPassword(sftpSettings["user"], sftpSettings["password"])
instance = ssh.newSFTPClient() instance = ssh.newSFTPClient()
} catch (ex: UserAuthException) { } catch (ex: UserAuthException) {
log.error("SFTP User Authentication Error: Invalid user authentication supplied.") log.error("SFTP User Authentication Error: Invalid user authentication supplied.")
exitProcess(1) exitProcess(1)
} catch (ex: ConnectException) {
log.error("SFTP Connection Exception: ${ex.message}.")
exitProcess(1)
} catch (ex: UnknownHostException) {
log.error("SFTP Host Not Found: ${ex.message}.")
exitProcess(1)
} catch (ex: Exception) { } catch (ex: Exception) {
ex.printStackTrace() ex.printStackTrace()
log.error(ex.localizedMessage) log.error("SSH Exception: ${ex.localizedMessage}")
exitProcess(1) exitProcess(1)
} }
} }
...@@ -66,7 +75,7 @@ class SftpClient(sftpSettings: Map<String, String>) { ...@@ -66,7 +75,7 @@ class SftpClient(sftpSettings: Map<String, String>) {
return instance.ls(path).map { it.path } return instance.ls(path).map { it.path }
} }
fun close() { override fun close() {
instance.close() instance.close()
ssh.disconnect() ssh.disconnect()
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment