In order to mitigate against the brute force attacks against Gitlab accounts, we are moving to all edu-ID Logins. We would like to remind you to link your account with your edu-id. Login will be possible only by edu-ID after November 30, 2021. Here you can find the instructions for linking your account.

If you don't have a SWITCH edu-ID, you can create one with this guide here

kind regards

Commit 724d6519 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

expand validation

- update kafka message format
- now supports CSV & TSV
- style fixes
parent 3011fe46
Pipeline #7253 failed with stages
in 2 minutes and 35 seconds
...@@ -33,13 +33,12 @@ class App { ...@@ -33,13 +33,12 @@ class App {
val validator = FileValidation(sftp) val validator = FileValidation(sftp)
val files = sftp.listFiles(settings.appSettings["directory"].orEmpty()).map { File(it) } val files = sftp.listFiles(settings.appSettings["directory"].orEmpty()).map { File(it) }
log.error(files)
for (file in files) { for (file in files) {
val report = validator.validate(file) val report = validator.validate(file)
when (report.status) { when (report.status) {
"valid" -> producer.sendMessage(file.name, file.path) "valid" -> producer.sendMessage(file.name, file.path, report.format)
else -> producer.sendMessage(file.name, "ERROR") else -> producer.sendMessage(file.name, "ERROR", report.format)
} }
producer.sendReport(report) producer.sendReport(report)
} }
......
...@@ -20,27 +20,41 @@ package org.memobase ...@@ -20,27 +20,41 @@ package org.memobase
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.util.MalformedCSVException import com.github.doyaaaaaken.kotlincsv.util.MalformedCSVException
import org.apache.logging.log4j.LogManager
import java.io.File import java.io.File
import org.apache.logging.log4j.LogManager
class FileValidation(val sftp: SftpClient) { class FileValidation(val sftp: SftpClient) {
private val log = LogManager.getLogger("FileValidation") private val log = LogManager.getLogger("FileValidation")
private val supportedExtensions = mapOf(
Pair("csv", "CSV"),
Pair("tsv", "TSV")
)
fun validate(file: File): Report { fun validate(file: File): Report {
val remoteFile = sftp.open(file) when (val format = validateExtension(file)) {
val stream = remoteFile.RemoteFileInputStream() "CSV", "TSV" -> {
try { val remoteFile = sftp.open(file)
csvReader().readAll(stream) val stream = remoteFile.RemoteFileInputStream()
} catch (ex: MalformedCSVException) { try {
remoteFile.close() csvReader {
return Report(id = file.name, status = "invalid", path = file.path, error = ex.localizedMessage) charset = "UTF-8"
delimiter = if (format == "CSV") ',' else '\t'
quoteChar = '"'
escapeChar = '\\'
}.readAll(stream)
} catch (ex: MalformedCSVException) {
remoteFile.close()
return Report(id = file.name, status = "invalid", path = file.path, format = format, error = ex.localizedMessage)
}
remoteFile.close()
return Report(id = file.name, status = "valid", path = file.path, format = format, error = "")
}
else -> return Report(id = file.name, status = "invalid", path = file.path, format = format, error = "Not a valid file extension: ${file.name}.")
} }
remoteFile.close()
return Report(id = file.name, status = "valid", path = file.path, error = "")
} }
private fun validateExtension(file: File): String {
return supportedExtensions.getOrDefault(file.extension, "INVALID")
} }
\ No newline at end of file }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.memobase package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import java.util.Properties import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.KafkaProducer
...@@ -27,8 +28,8 @@ class Producer(props: Properties, private val topic: String) { ...@@ -27,8 +28,8 @@ class Producer(props: Properties, private val topic: String) {
private val instance = KafkaProducer<String, String>(props) private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting" private val reportingTopic = "$topic-reporting"
fun sendMessage(key: String, message: String) { fun sendMessage(key: String, message: String, format: String) {
instance.send(ProducerRecord(topic, key, message)) instance.send(ProducerRecord(topic, key, JsonObject(mapOf(Pair("path", message), Pair("format", format))).toJsonString()))
} }
fun sendReport(report: Report) { fun sendReport(report: Report) {
......
...@@ -22,5 +22,6 @@ data class Report( ...@@ -22,5 +22,6 @@ data class Report(
val id: String, val id: String,
val status: String, val status: String,
val path: String, val path: String,
val format: String,
val error: String val error: String
) )
\ No newline at end of file
...@@ -18,12 +18,16 @@ ...@@ -18,12 +18,16 @@
package org.memobase package org.memobase
import java.io.File
import kotlin.system.exitProcess
import net.schmizz.sshj.SSHClient import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.sftp.* import net.schmizz.sshj.sftp.FileAttributes
import net.schmizz.sshj.sftp.FileMode
import net.schmizz.sshj.sftp.OpenMode
import net.schmizz.sshj.sftp.RemoteFile
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.userauth.UserAuthException import net.schmizz.sshj.userauth.UserAuthException
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import java.io.File
import kotlin.system.exitProcess
class SftpClient(sftpSettings: Map<String, String>) { class SftpClient(sftpSettings: Map<String, String>) {
private val log = LogManager.getLogger("SftpClient") private val log = LogManager.getLogger("SftpClient")
...@@ -46,9 +50,6 @@ class SftpClient(sftpSettings: Map<String, String>) { ...@@ -46,9 +50,6 @@ class SftpClient(sftpSettings: Map<String, String>) {
} }
} }
fun listFiles(path: String): List<String> { fun listFiles(path: String): List<String> {
return ls(path).filter { fileAttributes(it).mode.type == FileMode.Type.REGULAR } return ls(path).filter { fileAttributes(it).mode.type == FileMode.Type.REGULAR }
} }
...@@ -57,8 +58,6 @@ class SftpClient(sftpSettings: Map<String, String>) { ...@@ -57,8 +58,6 @@ class SftpClient(sftpSettings: Map<String, String>) {
return instance.open(file.path, setOf(OpenMode.READ)) return instance.open(file.path, setOf(OpenMode.READ))
} }
private fun fileAttributes(path: String): FileAttributes { private fun fileAttributes(path: String): FileAttributes {
return instance.lstat(path) return instance.lstat(path)
} }
...@@ -67,10 +66,8 @@ class SftpClient(sftpSettings: Map<String, String>) { ...@@ -67,10 +66,8 @@ class SftpClient(sftpSettings: Map<String, String>) {
return instance.ls(path).map { it.path } return instance.ls(path).map { it.path }
} }
fun close() { fun close() {
instance.close() instance.close()
ssh.disconnect() ssh.disconnect()
} }
}
}
\ No newline at end of file
...@@ -11,4 +11,4 @@ kafka: ...@@ -11,4 +11,4 @@ kafka:
bootstrap.servers: localhost:9092 bootstrap.servers: localhost:9092
client.id: sftp-reader-p1-j1 client.id: sftp-reader-p1-j1
topic: topic:
out: process-1-job-1-sftp-reader out: process-1-job-1-sftp-validated-files
\ No newline at end of file \ No newline at end of file
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
</Console> </Console>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Root level="info"> <Root level="debug">
<AppenderRef ref="STDOUT"/> <AppenderRef ref="STDOUT"/>
</Root> </Root>
</Loggers> </Loggers>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment