Commit b7aeb63e authored by Jonas Waeber's avatar Jonas Waeber
Browse files

implements sftp client

implements yaml settings loader
implements kafka producer
parent 1e41bfdd
......@@ -36,20 +36,18 @@ dependencies {
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
// implementation "org.swissbib:kafka-metadata-wrapper:2.4.0"
implementation "org.xbib:marc:2.1.0"
//implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.10.2'
// https://mvnrepository.com/artifact/org.marc4j/marc4j
compile group: 'org.marc4j', name: 'marc4j', version: '2.9.1'
//implementation "org.apache.kafka:kafka-streams:${kafkaV}"
implementation "org.apache.commons:commons-compress:1.19"
compile group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
//testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
// YAML Parser
implementation 'org.snakeyaml:snakeyaml-engine:2.1'
compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
compile "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
......@@ -76,7 +74,7 @@ test {
sourceSets {
main.kotlin.srcDirs += 'src/main/kotlin'
main.resources.srcDirs = [ "src/main/resources" ]
main.resources.includes = [ "**/*.properties", "**/*.xml", "**/*.json" ]
main.resources.includes = [ "**/*.yml", "**/*.xml"]
test.kotlin.srcDirs += 'src/test'
}
......
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.logging.log4j.LogManager
import java.io.File
import kotlin.system.exitProcess
class App {
companion object {
private val log = LogManager.getLogger("SftpReader")
@JvmStatic fun main(args: Array<String>) {
try {
val settings = SettingsLoader()
val producer = Producer(settings.kafkaProducerProperties, settings.topic)
val sftp = SftpClient(settings.sftpSettings)
val files = sftp.listFiles(settings.appSettings["directory"].orEmpty())
log.error(files)
for (file in files) {
if (File(file).extension == "csv") {
log.error(sftp.open(file).length())
}
}
producer.close()
sftp.close()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
exitProcess(1)
}
}
}
}
\ No newline at end of file
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.snakeyaml.engine.v2.env.EnvConfig
class CustomEnvConfig : EnvConfig
\ No newline at end of file
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
class MissingSettingException(setting: String) : Exception("Missing setting $setting in configuration file. Stopping application ...")
\ No newline at end of file
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.*
class Producer(props: Properties, private val topic: String) {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
fun send(key: String, message: String) {
instance.send(ProducerRecord(topic, key, message))
}
fun sendResults(key: String, message: String) {
// TODO: Implement Report message!
instance.send(ProducerRecord(reportingTopic, key))
}
fun close() {
instance.flush()
instance.close()
}
}
/*
* sftp-reader
* Copyright (C) 2020 Memobase Project
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
import org.snakeyaml.engine.v2.api.Load
import org.snakeyaml.engine.v2.api.LoadSettings
import org.snakeyaml.engine.v2.exceptions.MissingEnvironmentVariableException
import java.io.File
import java.io.FileInputStream
import java.util.*
import kotlin.system.exitProcess
class SettingsLoader {
private val log = LogManager.getLogger("SettingsLoader")
private fun loadYaml(): Any {
val settings = LoadSettings.builder().setEnvConfig(Optional.of(CustomEnvConfig())).build()
val load = Load(settings)
try {
val file = File("/configs/app.yml")
return if (file.isFile) {
load.loadFromInputStream(FileInputStream(file))
}
else {
log.warn("Loading default properties in app.yml from classpath!")
load.loadFromInputStream(ClassLoader.getSystemResourceAsStream("app.yml"))
}
} catch (ex: MissingEnvironmentVariableException) {
log.error(ex.message)
exitProcess(1)
}
}
private val rawKafkaProperties: Map<String, String>
val topic: String
val appSettings: Map<String, String>
val sftpSettings: Map<String, String>
val kafkaProducerProperties = Properties()
init {
val rawYaml = loadYaml()
try {
val mappedYaml = rawYaml as Map<String, Any>
val kafkaOptions = mappedYaml["kafka"] as Map<String, Any>
val topics = kafkaOptions["topic"] as Map<String, String>
topic = topics["out"].orEmpty()
if (topic.isEmpty()) {
throw MissingSettingException("kafka.topic.out")
}
rawKafkaProperties = kafkaOptions["producer"] as Map<String, String>
appSettings = rawYaml["app"] as Map<String, String>
ensurePropertyIsSet("directory", appSettings)
sftpSettings = rawYaml["sftp"] as Map<String, String>
ensurePropertyIsSet("host", sftpSettings)
ensurePropertyIsSet("port", sftpSettings)
ensurePropertyIsSet("user", sftpSettings)
ensurePropertyIsSet("password", sftpSettings)
ensurePropertyIsSet("fingerprint", sftpSettings)
} catch (ex: ClassCastException) {
ex.printStackTrace()
log.error("The properties file has an invalid structure: $ex")
exitProcess(1)
} catch (ex: MissingSettingException) {
log.error(ex.message)
exitProcess(1)
}
initializeKafkaProperties()
}
private fun ensurePropertyIsSet(setting: String, settings: Map<String, String>) {
if (settings[setting].isNullOrEmpty()) {
throw MissingSettingException(setting)
}
}
private fun initializeKafkaProperties() {
setKafkaProperty(ProducerConfig.CLIENT_ID_CONFIG, abortIfMissing = true)
setKafkaProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, abortIfMissing = true)
setKafkaProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
setKafkaProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
setKafkaProperty(ProducerConfig.BATCH_SIZE_CONFIG, 16384)
setKafkaProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, 33445532)
setKafkaProperty(ProducerConfig.LINGER_MS_CONFIG, 1)
setKafkaProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
setKafkaProperty(ProducerConfig.ACKS_CONFIG)
setKafkaProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd")
setKafkaProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)
setKafkaProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG)
setKafkaProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)
setKafkaProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)
setKafkaProperty(ProducerConfig.SEND_BUFFER_CONFIG)
setKafkaProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)
setKafkaProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG)
setKafkaProperty(ProducerConfig.RETRIES_CONFIG)
setKafkaProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)
setKafkaProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG)
setKafkaProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG)
setKafkaProperty(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)
setKafkaProperty(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)
setKafkaProperty(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG)
setKafkaProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG)
setKafkaProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
setKafkaProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)
setKafkaProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG)
setKafkaProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG)
setKafkaProperty(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG)
setKafkaProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG)
}
private fun setKafkaProperty(propertyName: String, defaultValue: Any? = null, abortIfMissing: Boolean = false) {
val envProperty = propertyName.replace("\\.".toRegex(), "_").toUpperCase()
when {
System.getenv(envProperty) != null -> {
log.debug("Found value for property $propertyName in environment variable $envProperty.")
kafkaProducerProperties.setProperty(propertyName, System.getenv(envProperty))
}
rawKafkaProperties.containsKey(propertyName) -> {
log.debug("Found value for property $propertyName in app.yml file.")
kafkaProducerProperties.setProperty(propertyName, rawKafkaProperties[propertyName])
}
defaultValue != null -> {
log.debug("Using default value $defaultValue for $propertyName.")
kafkaProducerProperties[propertyName] = defaultValue
}
abortIfMissing -> {
log.error("Required producer property $propertyName was not set! Aborting...")
exitProcess(1)
}
else -> log.trace("No value for property $propertyName found")
}
}
}
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.*
import net.schmizz.sshj.userauth.UserAuthException
import org.apache.logging.log4j.LogManager
import kotlin.system.exitProcess
class SftpClient(sftpSettings: Map<String, String>) {
private val log = LogManager.getLogger("SftpClient")
private val ssh = SSHClient()
private val instance: SFTPClient
init {
ssh.loadKnownHosts()
ssh.addHostKeyVerifier(sftpSettings["fingerprint"])
ssh.connect(sftpSettings["host"])
try {
ssh.authPassword(sftpSettings["user"], sftpSettings["password"])
instance = ssh.newSFTPClient()
} catch (ex: UserAuthException) {
log.error("Invalid user authentication supplied.")
exitProcess(1)
} catch (ex: Exception) {
ex.printStackTrace()
log.error(ex.localizedMessage)
exitProcess(1)
}
}
fun listFiles(path: String): List<String> {
return ls(path).filter { fileAttributes(it).mode.type == FileMode.Type.REGULAR }
}
fun open(path: String): RemoteFile {
return instance.open(path, setOf(OpenMode.READ))
}
private fun fileAttributes(path: String): FileAttributes {
return instance.lstat(path)
}
private fun ls(path: String): List<String> {
return instance.ls(path).map { it.path }
}
fun close() {
instance.close()
ssh.disconnect()
}
}
\ No newline at end of file
sftp:
host: sb-uingest1.swissbib.unibas.ch
port: "22"
user: mb_sftp
password: ${SFTP_PASSWORD:?nopass}
fingerprint: ${HOST_KEY_VERIFIER:?nofinger}
app:
directory: ./test_institution_1/test_record_set_1
kafka:
producer:
bootstrap.servers: localhost:9092
client.id: sftp-reader-p1-j1
topic:
out: process-1-job-1-sftp-reader
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment