Commit 0a36b411 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

add tests

parent 9f007aac
Pipeline #8474 failed with stages
in 1 minute and 50 seconds
......@@ -18,11 +18,11 @@ jar {
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
jcenter()
maven { url 'https://gitlab.com/api/v4/projects/11507450/packages/maven' }
maven { url 'https://jitpack.io' }
maven {
url "https://dl.bintray.com/jonas-waeber/memobase"
}
}
ext {
......@@ -41,16 +41,14 @@ dependencies {
//implementation "org.apache.kafka:kafka-streams:${kafkaV}"
compile group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation 'org.memobase:memobase-service-utilities:1.1.4'
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
// YAML Parser
implementation 'org.snakeyaml:snakeyaml-engine:2.1'
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// COMPRESSION UTILS
implementation "org.apache.commons:commons-compress:1.19"
// CSV Reader
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:0.7.3")
// KOTLIN IMPORTS
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
......@@ -59,23 +57,7 @@ dependencies {
// JUNIT
testCompile("org.junit.jupiter:junit-jupiter:5.4.2")
// MOCK SFTP
implementation 'com.github.stefanbirkner:fake-sftp-server-rule:2.0.1'
// MOCK KAFKA
testImplementation "org.apache.kafka:kafka-clients:$kafkaV:test"
testImplementation "org.apache.kafka:kafka_2.11:$kafkaV"
testImplementation "org.apache.kafka:kafka_2.11:$kafkaV:test"
// MOCK KAFKA STREAMS
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
//testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
// https://mvnrepository.com/artifact/com.github.marschall/memoryfilesystem
testCompile group: 'com.github.marschall', name: 'memoryfilesystem', version: '2.1.0'
// https://mvnrepository.com/artifact/org.apache.sshd/sshd-core
testCompile group: 'org.apache.sshd', name: 'sshd-core', version: '2.4.0'
// https://mvnrepository.com/artifact/org.apache.sshd/sshd-sftp
testCompile group: 'org.apache.sshd', name: 'sshd-sftp', version: '2.4.0'
testImplementation 'org.assertj:assertj-core:3.15.0'
}
compileKotlin {
......
......@@ -21,14 +21,20 @@ package org.memobase
import java.io.File
import kotlin.system.exitProcess
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class App {
companion object {
private val log = LogManager.getLogger("SftpReader")
@JvmStatic fun main(args: Array<String>) {
try {
val settings = SettingsLoader()
Producer(settings.kafkaProducerProperties, settings.topic).use { producer ->
val settings = SettingsLoader(
listOf("directory"),
useProducerConfig = true,
readSftpSettings = true
)
Producer(settings.kafkaProducerSettings, settings.outputTopic).use { producer ->
SftpClient(settings.sftpSettings).use { sftp ->
val validator = FileValidation(sftp)
......@@ -38,7 +44,8 @@ class App {
val report = validator.validate(file)
when (report.status) {
"valid" -> producer.sendMessage(file.name, file.path, report.format)
else -> producer.sendMessage(file.name, "ERROR", report.format)
else ->
producer.sendMessage(file.name, "ERROR", report.format)
}
producer.sendReport(report)
}
......
......@@ -22,8 +22,9 @@ import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.util.MalformedCSVException
import java.io.File
import org.apache.logging.log4j.LogManager
import org.memobase.sftp.SftpClient
class FileValidation(val sftp: SftpClient) {
class FileValidation(private val sftp: SftpClient) {
private val log = LogManager.getLogger("FileValidation")
private val supportedExtensions = mapOf(
......
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
class MissingSettingException(source: String, setting: String) :
Exception(
when (source) {
"env" -> "Internal Configuration Error: $setting!"
"missing" -> "Internal Configuration Error: Missing setting $setting in configuration file."
else -> "User Configuration Error: A value for setting $setting is required."
}
)
......@@ -23,15 +23,19 @@ import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.logging.log4j.LogManager
class Producer(props: Properties, private val topic: String) : Closeable {
private val log = LogManager.getLogger("MessageProducer")
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
private var count = 0
fun sendMessage(key: String, message: String, format: String) {
log.info("Sending message $message to $topic.")
instance.send(ProducerRecord(topic, key, JsonObject(mapOf(Pair("path", message), Pair("format", format))).toJsonString()))
count += 1
}
......
/*
* sftp-reader
* Copyright (C) 2020 Memobase Project
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.io.File
import java.io.FileInputStream
import java.util.Optional
import java.util.Properties
import kotlin.system.exitProcess
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
import org.snakeyaml.engine.v2.api.Load
import org.snakeyaml.engine.v2.api.LoadSettings
import org.snakeyaml.engine.v2.exceptions.MissingEnvironmentVariableException
class SettingsLoader {
private val log = LogManager.getLogger("SettingsLoader")
private fun loadYaml(): Any {
val settings = LoadSettings.builder().setEnvConfig(Optional.of(CustomEnvConfig())).build()
val load = Load(settings)
try {
val file = File("/configs/app.yml")
return if (file.isFile) {
load.loadFromInputStream(FileInputStream(file))
} else {
log.warn("Loading default properties in app.yml from classpath!")
load.loadFromInputStream(ClassLoader.getSystemResourceAsStream("app.yml"))
}
} catch (ex: MissingEnvironmentVariableException) {
throw MissingSettingException("env", "${ex.localizedMessage}")
}
}
private val rawKafkaProperties: Map<String, String>
val topic: String
val appSettings = Properties()
val sftpSettings = Properties()
val kafkaProducerProperties = Properties()
private val mappedYaml: Map<String, Any>
init {
try {
val rawYaml = loadYaml()
mappedYaml = rawYaml as Map<String, Any>
val kafkaOptions = mappedYaml["kafka"] as Map<String, Any>
val topics = kafkaOptions["topic"] as Map<String, String>
topic = topics["out"].orEmpty()
if (topic.isEmpty()) {
throw MissingSettingException("missing", "kafka.topic.out")
}
rawKafkaProperties = kafkaOptions["producer"] as Map<String, String>
appSettings.setProperty("directory", addSetting("app", "directory"))
sftpSettings.setProperty("host", addSetting("sftp", "host"))
sftpSettings.setProperty("port", addSetting("sftp", "port"))
sftpSettings.setProperty("user", addSetting("sftp", "user"))
sftpSettings.setProperty("password", addSetting("sftp", "password"))
} catch (ex: ClassCastException) {
ex.printStackTrace()
log.error("The properties file has an invalid structure: $ex")
exitProcess(1)
} catch (ex: MissingSettingException) {
log.error(ex.message)
exitProcess(1)
}
initializeKafkaProperties()
}
private fun addSetting(base: String, setting: String): String {
val settings = mappedYaml[base] as Map<String, Any>
if (settings.containsKey(setting)) {
return when (val value = settings[setting]) {
is String -> if (value.isNotEmpty()) value else throw MissingSettingException("missing", "$base.$setting")
is Int -> value.toString()
else -> throw MissingSettingException("missing", "$base.$setting")
}
} else {
throw MissingSettingException("missing", "$base.$setting")
}
}
private fun initializeKafkaProperties() {
setKafkaProperty(ProducerConfig.CLIENT_ID_CONFIG, abortIfMissing = true)
setKafkaProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, abortIfMissing = true)
setKafkaProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
setKafkaProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
setKafkaProperty(ProducerConfig.BATCH_SIZE_CONFIG, 16384)
setKafkaProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, 33445532)
setKafkaProperty(ProducerConfig.LINGER_MS_CONFIG, 1)
setKafkaProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
setKafkaProperty(ProducerConfig.ACKS_CONFIG)
setKafkaProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd")
setKafkaProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)
setKafkaProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG)
setKafkaProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)
setKafkaProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)
setKafkaProperty(ProducerConfig.SEND_BUFFER_CONFIG)
setKafkaProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)
setKafkaProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG)
setKafkaProperty(ProducerConfig.RETRIES_CONFIG)
setKafkaProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)
setKafkaProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG)
setKafkaProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG)
setKafkaProperty(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)
setKafkaProperty(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)
setKafkaProperty(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG)
setKafkaProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG)
setKafkaProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
setKafkaProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)
setKafkaProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG)
setKafkaProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG)
setKafkaProperty(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG)
setKafkaProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG)
}
private fun setKafkaProperty(propertyName: String, defaultValue: Any? = null, abortIfMissing: Boolean = false) {
val envProperty = propertyName.replace("\\.".toRegex(), "_").toUpperCase()
when {
System.getenv(envProperty) != null -> {
log.debug("Found value for property $propertyName in environment variable $envProperty.")
kafkaProducerProperties.setProperty(propertyName, System.getenv(envProperty))
}
rawKafkaProperties.containsKey(propertyName) -> {
log.debug("Found value for property $propertyName in app.yml file.")
kafkaProducerProperties.setProperty(propertyName, rawKafkaProperties[propertyName])
}
defaultValue != null -> {
log.debug("Using default value $defaultValue for $propertyName.")
kafkaProducerProperties[propertyName] = defaultValue
}
abortIfMissing -> {
log.error("Required producer property $propertyName was not set! Aborting...")
throw MissingSettingException("missing", "kafka.producer.$propertyName")
}
else -> log.trace("No value for property $propertyName found")
}
}
}
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.io.Closeable
import java.io.File
import java.net.ConnectException
import java.net.UnknownHostException
import java.util.Properties
import kotlin.system.exitProcess
import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.FileAttributes
import net.schmizz.sshj.sftp.FileMode
import net.schmizz.sshj.sftp.OpenMode
import net.schmizz.sshj.sftp.RemoteFile
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.userauth.UserAuthException
import org.apache.logging.log4j.LogManager
class SftpClient(sftpSettings: Properties) : Closeable {
private val log = LogManager.getLogger("SftpClient")
private val ssh = SSHClient()
private val instance: SFTPClient
init {
try {
ssh.addHostKeyVerifier(PromiscuousVerifier())
ssh.connect(sftpSettings.getProperty("host"), sftpSettings.getProperty("port").toInt())
ssh.authPassword(sftpSettings.getProperty("user"), sftpSettings.getProperty("password"))
instance = ssh.newSFTPClient()
} catch (ex: UserAuthException) {
log.error("SFTP User Authentication Error: Invalid user authentication supplied.")
exitProcess(1)
} catch (ex: ConnectException) {
log.error("SFTP Connection Exception: ${ex.message}.")
exitProcess(1)
} catch (ex: UnknownHostException) {
log.error("SFTP Host Not Found: ${ex.message}.")
exitProcess(1)
} catch (ex: Exception) {
ex.printStackTrace()
log.error("SSH Exception: ${ex.localizedMessage}")
exitProcess(1)
}
}
fun listFiles(path: String): List<String> {
return ls(path).filter { fileAttributes(it).mode.type == FileMode.Type.REGULAR }
}
fun open(file: File): RemoteFile {
return instance.open(file.path, setOf(OpenMode.READ))
}
private fun fileAttributes(path: String): FileAttributes {
return instance.lstat(path)
}
private fun ls(path: String): List<String> {
return instance.ls(path).map { it.path }
}
override fun close() {
instance.close()
ssh.disconnect()
}
}
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.junit.jupiter.api.extension.BeforeAllCallback
import org.junit.jupiter.api.extension.ExtensionContext
class EmbeddedKafkaExtension : BeforeAllCallback {
lateinit var kafka: KafkaEmbedded
private val testTopic: String = "test"
private val port: Int = 12345
override fun beforeAll(context: ExtensionContext?) {
kafka = KafkaEmbedded(port, testTopic)
val wrapper = KafkaWrapper(kafka)
if (context != null) {
context.getStore(ExtensionContext.Namespace.create(EmbeddedKafkaExtension::class.java))
.put("redis", wrapper)
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
wrapper.close()
}))
}
}
class KafkaWrapper(private val embedded: KafkaEmbedded) : ExtensionContext.Store.CloseableResource {
override fun close() {
embedded.close()
}
}
}
/*
* record-parser
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.github.marschall.memoryfilesystem.MemoryFileSystemBuilder.newLinux
import java.io.Closeable
import java.io.IOException
import java.io.InputStream
import java.nio.charset.Charset
import java.nio.file.FileStore
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.nio.file.WatchService
import java.nio.file.attribute.UserPrincipalLookupService
import java.nio.file.spi.FileSystemProvider
import org.apache.sshd.server.SshServer
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory
class EmbeddedSftpServer(port: Int, user: String, password: String) : Closeable {
private val credentials = mapOf(Pair(user, password))
private val fileSystem: FileSystem = newLinux().build("EmbeddedSftpServerFileSystem@" + hashCode())
private val server: SshServer = SshServer.setUpDefaultServer()
init {
server.port = port
server.keyPairProvider = SimpleGeneratorHostKeyProvider()
server.setPasswordAuthenticator { authUser, authPassword, _ -> authenticate(authUser, authPassword) }
server.subsystemFactories = listOf(SftpSubsystemFactory())
/* When a channel is closed SshServer calls close() on the file system.
* In order to use the file system for multiple channels/sessions we
* have to use a file system wrapper whose close() does nothing.
*/
server.setFileSystemFactory { DoNotClose(fileSystem) }
server.start()
}
/**
* Put a text file on the SFTP folder. The file is available by the
* specified path.
* @param path the path to the file.
* @param content the files content.
* @param encoding the encoding of the file.
* @throws IOException if the file cannot be written.
*/
@Throws(IOException::class)
fun putFile(
path: String,
content: String,
encoding: Charset = Charset.defaultCharset()
) {
val contentAsBytes = content.toByteArray(encoding)
putFile(path, contentAsBytes)
}
/**
* Put a file on the SFTP folder. The file is available by the specified
* path.
* @param path the path to the file.
* @param content the files content.
* @throws IOException if the file cannot be written.
*/
@Throws(IOException::class)
fun putFile(
path: String,
content: ByteArray
) {
val pathAsObject = fileSystem.getPath(path)
ensureDirectoryOfPathExists(pathAsObject)
Files.write(pathAsObject, content)
}
/**
* Put a file on the SFTP folder. The file is available by the specified
* path. The file content is read from an `InputStream`.
* @param path the path to the file.
* @param inputStream an `InputStream` that provides the file's content.
* @throws IOException if the file cannot be written or the input stream
* cannot be read.
*/
@Throws(IOException::class)
fun putFile(
path: String,
inputStream: InputStream
) {
val pathAsObject = fileSystem.getPath(path)
ensureDirectoryOfPathExists(pathAsObject)
Files.copy(inputStream, pathAsObject)
}
/**
* Create a directory on the SFTP server.
* @param path the directory's path.
* @throws IOException if the directory cannot be created.
*/
@Throws(IOException::class)
fun createDirectory(
path: String
) {
val pathAsObject = fileSystem.getPath(path)
Files.createDirectories(pathAsObject)
}
/**
* Create multiple directories on the SFTP server.
* @param paths the directories' paths.
* @throws IOException if at least one directory cannot be created.
*/
@Throws(IOException::class)
fun createDirectories(
vararg paths: String
) {
for (path in paths) createDirectory(path)
}
/**
* Get a text file from the SFTP server.
* @param path the path to the file.
* @return the content of the text file.
* @throws IOException if the file cannot be read.
* @throws IllegalStateException if not called from within a test.
*/
@Throws(IOException::class)
fun getFileContent(
path: String,
encoding: Charset = Charset.defaultCharset()
): String {
return getFileContent(path).toString(encoding)
}
/**
* Get a file from the SFTP server.
* @param path the path to the file.