Commit 2d11b7c7 authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

Initialize SftpClient only once at service startup

parent 4f9aaddd
Pipeline #16733 passed with stages
in 5 minutes and 16 seconds
...@@ -28,11 +28,10 @@ import org.fcrepo.client.FcrepoOperationFailedException ...@@ -28,11 +28,10 @@ import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraTransactionClient import org.memobase.fedora.FedoraTransactionClient
import org.memobase.fedora.RdfContentTypes import org.memobase.fedora.RdfContentTypes
import org.memobase.settings.SftpSettings
import org.memobase.sftp.SftpClient import org.memobase.sftp.SftpClient
class Ingester( class Ingester(
private val sftpSettings: SftpSettings, private val sftpClient: SftpClient,
private val fedoraClient: FedoraClient, private val fedoraClient: FedoraClient,
private val externalBaseUrl: String private val externalBaseUrl: String
) { ) {
...@@ -83,8 +82,6 @@ class Ingester( ...@@ -83,8 +82,6 @@ class Ingester(
} }
private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler, transaction: FedoraTransactionClient) { private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler, transaction: FedoraTransactionClient) {
SftpClient(sftpSettings).use { sftpClient ->
log.info("Connected to sFTP server.")
sftpLocators.forEach { sftpLocators.forEach {
val digitalInstantiationUrl = it.first val digitalInstantiationUrl = it.first
it.second.let { path -> it.second.let { path ->
...@@ -99,5 +96,4 @@ class Ingester( ...@@ -99,5 +96,4 @@ class Ingester(
} }
} }
} }
}
} }
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.memobase package org.memobase
import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
...@@ -27,8 +29,7 @@ import org.memobase.exceptions.SftpClientException ...@@ -27,8 +29,7 @@ import org.memobase.exceptions.SftpClientException
import org.memobase.fedora.FedoraClient import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
import java.io.Closeable import org.memobase.sftp.SftpClient
import java.util.*
class Service(fileName: String = "app.yml") : Closeable { class Service(fileName: String = "app.yml") : Closeable {
...@@ -68,6 +69,7 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -68,6 +69,7 @@ class Service(fileName: String = "app.yml") : Closeable {
private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean() private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean()
private var consumer: Consumer private var consumer: Consumer
private var producer: Producer private var producer: Producer
private var sftpClient: SftpClient? = null
init { init {
val consumerSettings = settings.kafkaConsumerSettings val consumerSettings = settings.kafkaConsumerSettings
...@@ -75,7 +77,11 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -75,7 +77,11 @@ class Service(fileName: String = "app.yml") : Closeable {
consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS) consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
consumer = Consumer(consumerSettings, settings.inputTopic) consumer = Consumer(consumerSettings, settings.inputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic) producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
log.info("Connected to Kafka.") log.info("Connected to Kafka cluster.")
if (!isSimple) {
sftpClient = SftpClient(settings.sftpSettings)
log.info("Connected to sFTP server.")
}
} }
fun run() { fun run() {
...@@ -84,7 +90,7 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -84,7 +90,7 @@ class Service(fileName: String = "app.yml") : Closeable {
} }
} }
private fun processRecords() { fun processRecords() {
for (record in consumer.fetchRecords()) { for (record in consumer.fetchRecords()) {
val ingestReport = if (isSimple) val ingestReport = if (isSimple)
processSingleEntity(record) processSingleEntity(record)
...@@ -121,14 +127,16 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -121,14 +127,16 @@ class Service(fileName: String = "app.yml") : Closeable {
} }
private fun processRecord(record: ConsumerRecord<String, String>): Report { private fun processRecord(record: ConsumerRecord<String, String>): Report {
val ingester = Ingester( val ingester = sftpClient?.let {
settings.sftpSettings, Ingester(
it,
fedoraClient, fedoraClient,
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl") settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
) )
}
return try { return try {
ingester.ingest(record.key(), record.value()) ingester?.ingest(record.key(), record.value())
Report( Report(
id = record.key(), id = record.key(),
status = ReportStatus.success, status = ReportStatus.success,
...@@ -168,5 +176,6 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -168,5 +176,6 @@ class Service(fileName: String = "app.yml") : Closeable {
override fun close() { override fun close() {
consumer.close() consumer.close()
producer.close() producer.close()
sftpClient?.close()
} }
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment