Commit c2e9d098 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Change chart & dependencies

Update sftp client
parent ef57b049
Pipeline #9868 passed with stages
in 5 minutes and 25 seconds
...@@ -41,10 +41,7 @@ dependencies { ...@@ -41,10 +41,7 @@ dependencies {
//implementation "org.apache.kafka:kafka-streams:${kafkaV}" //implementation "org.apache.kafka:kafka-streams:${kafkaV}"
compile group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV compile group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation 'org.memobase:memobase-service-utilities:1.2.1' implementation 'org.memobase:memobase-service-utilities:1.4.0'
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
// JSON Parser // JSON Parser
implementation 'com.beust:klaxon:5.2' implementation 'com.beust:klaxon:5.2'
// CSV Reader // CSV Reader
......
apiVersion: v1 apiVersion: v1
kind: ConfigMap kind: ConfigMap
metadata: metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-app-config" name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
namespace: memobase namespace: memobase
data: data:
APP_DIRECTORY: "{{ .Values.appDirectory }}" APP_DIRECTORY: "{{ .Values.appDirectory }}"
......
...@@ -19,6 +19,6 @@ spec: ...@@ -19,6 +19,6 @@ spec:
- configMapRef: - configMapRef:
name: "{{ .Values.kafkaConfigs }}" name: "{{ .Values.kafkaConfigs }}"
- configMapRef: - configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-app-config" name: "{{ .Values.processId }}-{{ .Values.jobName }}-config"
restartPolicy: Never restartPolicy: Never
backoffLimit: 0 backoffLimit: 0
\ No newline at end of file
...@@ -23,6 +23,6 @@ recordSetId: placeholder ...@@ -23,6 +23,6 @@ recordSetId: placeholder
## Needs to be set to the directory on the sftp server. ## Needs to be set to the directory on the sftp server.
## this is a relative path built like this: ## this is a relative path built like this:
## "./{INSTITUTION_ID}/{RECORD_SET_ID}" ## "./{RECORD_SET_ID}"
## The exact structure will be defined in task MEMO-196 ## The exact structure will be defined in task MEMO-196
appDirectory: placeholderValue appDirectory: placeholderValue
\ No newline at end of file
...@@ -21,10 +21,13 @@ package org.memobase ...@@ -21,10 +21,13 @@ package org.memobase
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.util.MalformedCSVException import com.github.doyaaaaaken.kotlincsv.util.MalformedCSVException
import java.io.File import java.io.File
import java.io.InputStream
import org.apache.logging.log4j.LogManager
import org.apache.poi.ss.usermodel.WorkbookFactory import org.apache.poi.ss.usermodel.WorkbookFactory
import org.memobase.sftp.SftpClient
class FileValidation(private val sftp: SftpClient) { class FileValidation {
private val log = LogManager.getLogger("TextFileValidationService")
private val supportedExtensions = mapOf( private val supportedExtensions = mapOf(
Pair(Extensions.csv, Formats.csv), Pair(Extensions.csv, Formats.csv),
Pair(Extensions.tsv, Formats.tsv), Pair(Extensions.tsv, Formats.tsv),
...@@ -32,11 +35,15 @@ class FileValidation(private val sftp: SftpClient) { ...@@ -32,11 +35,15 @@ class FileValidation(private val sftp: SftpClient) {
Pair(Extensions.xls, Formats.xls) Pair(Extensions.xls, Formats.xls)
) )
fun validate(file: File): Pair<Message, Report> { fun validateExtension(file: File): String {
return when (val format = validateExtension(file)) { return supportedExtensions.getOrDefault(file.extension, Formats.invalid)
}
fun validate(inputStream: InputStream, format: String, file: File): Pair<Message, Report> {
log.info("Validate file with format $format.")
return when (format) {
Formats.csv, Formats.tsv -> { Formats.csv, Formats.tsv -> {
sftp.open(file).use { inputStream.use { stream ->
val stream = it.RemoteFileInputStream()
try { try {
csvReader { csvReader {
charset = "UTF-8" charset = "UTF-8"
...@@ -65,9 +72,8 @@ class FileValidation(private val sftp: SftpClient) { ...@@ -65,9 +72,8 @@ class FileValidation(private val sftp: SftpClient) {
} }
} }
Formats.xlsx, Formats.xls -> { Formats.xlsx, Formats.xls -> {
sftp.open(file).use { inputStream.use { stream ->
try { try {
val stream = it.RemoteFileInputStream()
WorkbookFactory.create(stream).close() WorkbookFactory.create(stream).close()
} catch (ex: Exception) { } catch (ex: Exception) {
return@use Pair( return@use Pair(
...@@ -99,8 +105,4 @@ class FileValidation(private val sftp: SftpClient) { ...@@ -99,8 +105,4 @@ class FileValidation(private val sftp: SftpClient) {
) )
} }
} }
private fun validateExtension(file: File): String {
return supportedExtensions.getOrDefault(file.extension, Formats.invalid)
}
} }
...@@ -43,12 +43,13 @@ class Service(fileName: String = "app.yml") { ...@@ -43,12 +43,13 @@ class Service(fileName: String = "app.yml") {
producer.use { producer -> producer.use { producer ->
sftpClient.use { sftp -> sftpClient.use { sftp ->
log.info("Connected to SFTP & Kafka.") log.info("Connected to SFTP & Kafka.")
val validator = FileValidation(sftp) val validator = FileValidation()
val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) } val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }
val reports = mutableListOf<Report>() val reports = mutableListOf<Report>()
for (file in files) { for (file in files) {
log.info("Validate file ${file.absolutePath}.") log.info("Validate file ${file.absolutePath}.")
val validationResult = validator.validate(file) val format = validator.validateExtension(file)
val validationResult = validator.validate(sftp.open(file), format, file)
producer.sendMessage(validationResult.second.id, validationResult.first) producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second) producer.sendReport(validationResult.second)
reports.add(validationResult.second) reports.add(validationResult.second)
...@@ -57,13 +58,13 @@ class Service(fileName: String = "app.yml") { ...@@ -57,13 +58,13 @@ class Service(fileName: String = "app.yml") {
if (failures > 0) { if (failures > 0) {
log.warn("Validation ended with $failures failures!") log.warn("Validation ended with $failures failures!")
producer.sendJobReport( producer.sendJobReport(
Report(settings.id, status = ReportStatus.failure, message = ReportMessages.processFailure(failures, reports.size)), Report("text-file-validation", status = ReportStatus.failure, message = ReportMessages.processFailure(failures, reports.size)),
settings.processReportTopic settings.processReportTopic
) )
} else { } else {
log.info("Validation was successful!") log.info("Validation was successful!")
producer.sendJobReport( producer.sendJobReport(
Report(settings.id, status = ReportStatus.success, message = ReportMessages.processSuccess(reports.size)), Report("text-file-validation", status = ReportStatus.success, message = ReportMessages.processSuccess(reports.size)),
settings.processReportTopic settings.processReportTopic
) )
} }
......
...@@ -19,11 +19,10 @@ package org.memobase ...@@ -19,11 +19,10 @@ package org.memobase
import com.beust.klaxon.Klaxon import com.beust.klaxon.Klaxon
import java.io.FileInputStream import java.io.FileInputStream
import java.nio.file.Paths
import java.time.Duration import java.time.Duration
import java.util.Properties import java.util.Properties
import java.util.stream.Stream import java.util.stream.Stream
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.consumer.KafkaConsumer
...@@ -34,8 +33,8 @@ import org.junit.jupiter.api.TestInstance ...@@ -34,8 +33,8 @@ import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedKafkaExtension import org.memobase.testing.EmbeddedKafkaExtension
import org.memobase.extensions.EmbeddedSftpServer import org.memobase.testing.EmbeddedSftpServer
@ExtendWith(EmbeddedKafkaExtension::class) @ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
...@@ -44,34 +43,18 @@ class Tests { ...@@ -44,34 +43,18 @@ class Tests {
private val sftpServer = EmbeddedSftpServer(22000, "user", "password") private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
private val adminClient =
AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))
init { init {
sftpServer.createDirectories( val files = listOf(
"/memobase/test_institution_1/test_record_set_1/", Pair("/memobase/test_record_set_1", "brandt.csv"),
"/memobase/test_institution_2/test_record_set_2/" Pair("/memobase/test_record_set_2", "bauGAZH_metadaten.csv"),
) Pair("/memobase/test_record_set_3", "invalid.csv"),
sftpServer.putFile( Pair("/memobase/test_record_set_4", "file.txt"),
"/memobase/test_institution_1/test_record_set_1/brandt.csv", Pair("/memobase/test_record_set_5", "20190906_Brandt_Metadaten.xlsx")
FileInputStream("src/test/resources/data/brandt.csv")
)
sftpServer.putFile(
"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv",
FileInputStream("src/test/resources/data/bauGAZH_metadaten.csv")
)
sftpServer.putFile(
"/memobase/test_institution_3/test_record_set_3/invalid.csv",
FileInputStream("src/test/resources/data/invalid.csv")
)
sftpServer.putFile(
"/memobase/test_institution_4/test_record_set_4/file.txt",
FileInputStream("src/test/resources/data/file.txt")
)
sftpServer.putFile(
"/memobase/test_institution_5/test_record_set_5/file.xlsx",
FileInputStream("src/test/resources/data/20190906_Brandt_Metadaten.xlsx")
) )
for (pair in files) {
sftpServer.putFile(Paths.get(pair.first, pair.second).toString(), FileInputStream(Paths.get("src/test/resources/data", pair.second).toFile()))
}
} }
private val consumer: KafkaConsumer<String, String> private val consumer: KafkaConsumer<String, String>
...@@ -90,7 +73,7 @@ class Tests { ...@@ -90,7 +73,7 @@ class Tests {
@ParameterizedTest @ParameterizedTest
@MethodSource("directoryReaderTests") @MethodSource("directoryReaderTests")
fun `read valid csv from sftp`(params: TestParams) { fun `read files from sftp`(params: TestParams) {
val service = Service(params.configFile) val service = Service(params.configFile)
service.run() service.run()
...@@ -129,16 +112,16 @@ class Tests { ...@@ -129,16 +112,16 @@ class Tests {
TestParams( TestParams(
"test1.yml", "test1.yml",
expectedKey = "brandt.csv", expectedKey = "brandt.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_1/test_record_set_1/brandt.csv\"}", expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_record_set_1/brandt.csv\"}",
expectedReportValue = Klaxon().toJsonString( expectedReportValue = Klaxon().toJsonString(
Report( Report(
id = "brandt.csv", id = "brandt.csv",
status = "SUCCESS", status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv with format CSV." message = "Validated file at path /memobase/test_record_set_1/brandt.csv with format CSV."
) )
), ),
expectedProcessReport = Report( expectedProcessReport = Report(
id = "jobXYZ", id = "text-file-validation",
status = "SUCCESS", status = "SUCCESS",
message = "Successfully validated 1 files." message = "Successfully validated 1 files."
) )
...@@ -146,16 +129,16 @@ class Tests { ...@@ -146,16 +129,16 @@ class Tests {
TestParams( TestParams(
"test2.yml", "test2.yml",
expectedKey = "bauGAZH_metadaten.csv", expectedKey = "bauGAZH_metadaten.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\"}", expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_record_set_2/bauGAZH_metadaten.csv\"}",
expectedReportValue = Klaxon().toJsonString( expectedReportValue = Klaxon().toJsonString(
Report( Report(
id = "bauGAZH_metadaten.csv", id = "bauGAZH_metadaten.csv",
status = "SUCCESS", status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv with format CSV." message = "Validated file at path /memobase/test_record_set_2/bauGAZH_metadaten.csv with format CSV."
) )
), ),
expectedProcessReport = Report( expectedProcessReport = Report(
id = "jobXYZ", id = "text-file-validation",
status = "SUCCESS", status = "SUCCESS",
message = "Successfully validated 1 files." message = "Successfully validated 1 files."
) )
...@@ -163,16 +146,16 @@ class Tests { ...@@ -163,16 +146,16 @@ class Tests {
TestParams( TestParams(
"test3.yml", "test3.yml",
expectedKey = "invalid.csv", expectedKey = "invalid.csv",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_3/test_record_set_3/invalid.csv\"}", expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_record_set_3/invalid.csv\"}",
expectedReportValue = Klaxon().toJsonString( expectedReportValue = Klaxon().toJsonString(
Report( Report(
id = "invalid.csv", id = "invalid.csv",
status = "FAILURE", status = "FAILURE",
message = "CSV ERROR: Fields num seems to be 5 on each row, but on 2th csv row, fields num is 7. for file /memobase/test_institution_3/test_record_set_3/invalid.csv." message = "CSV ERROR: Fields num seems to be 4 on each row, but on 2th csv row, fields num is 6. for file /memobase/test_record_set_3/invalid.csv."
) )
), ),
expectedProcessReport = Report( expectedProcessReport = Report(
id = "jobXYZ", id = "text-file-validation",
status = "FAILURE", status = "FAILURE",
message = "Failed to validate 1 of 1 files." message = "Failed to validate 1 of 1 files."
) )
...@@ -180,7 +163,7 @@ class Tests { ...@@ -180,7 +163,7 @@ class Tests {
TestParams( TestParams(
"test4.yml", "test4.yml",
expectedKey = "file.txt", expectedKey = "file.txt",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_4/test_record_set_4/file.txt\"}", expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_record_set_4/file.txt\"}",
expectedReportValue = Klaxon().toJsonString( expectedReportValue = Klaxon().toJsonString(
Report( Report(
id = "file.txt", id = "file.txt",
...@@ -189,24 +172,24 @@ class Tests { ...@@ -189,24 +172,24 @@ class Tests {
) )
), ),
expectedProcessReport = Report( expectedProcessReport = Report(
id = "jobXYZ", id = "text-file-validation",
status = "FAILURE", status = "FAILURE",
message = "Failed to validate 1 of 1 files." message = "Failed to validate 1 of 1 files."
) )
), ),
TestParams( TestParams(
"test5.yml", "test5.yml",
expectedKey = "file.xlsx", expectedKey = "20190906_Brandt_Metadaten.xlsx",
expectedValue = "{\"format\" : \"XLSX\", \"path\" : \"/memobase/test_institution_5/test_record_set_5/file.xlsx\"}", expectedValue = "{\"format\" : \"XLSX\", \"path\" : \"/memobase/test_record_set_5/20190906_Brandt_Metadaten.xlsx\"}",
expectedReportValue = Klaxon().toJsonString( expectedReportValue = Klaxon().toJsonString(
Report( Report(
id = "file.xlsx", id = "20190906_Brandt_Metadaten.xlsx",
status = "SUCCESS", status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_5/test_record_set_5/file.xlsx with format XLSX." message = "Validated file at path /memobase/test_record_set_5/20190906_Brandt_Metadaten.xlsx with format XLSX."
) )
), ),
expectedProcessReport = Report( expectedProcessReport = Report(
id = "jobXYZ", id = "text-file-validation",
status = "SUCCESS", status = "SUCCESS",
message = "Successfully validated 1 files." message = "Successfully validated 1 files."
) )
......
,saudia,asdjicx,asjdij, test,test,test,test
,asjd,si,,,, test,test,test,test,test,tes
\ No newline at end of file test,test,test,test,test
test
...@@ -5,7 +5,7 @@ sftp: ...@@ -5,7 +5,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_institution_1/test_record_set_1 directory: /memobase/test_record_set_1
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -5,7 +5,7 @@ sftp: ...@@ -5,7 +5,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_institution_2/test_record_set_2 directory: /memobase/test_record_set_2
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -5,7 +5,7 @@ sftp: ...@@ -5,7 +5,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_institution_3/test_record_set_3 directory: /memobase/test_record_set_3
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -5,7 +5,7 @@ sftp: ...@@ -5,7 +5,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_institution_4/test_record_set_4 directory: /memobase/test_record_set_4
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
...@@ -5,7 +5,7 @@ sftp: ...@@ -5,7 +5,7 @@ sftp:
user: user user: user
password: password password: password
app: app:
directory: /memobase/test_institution_5/test_record_set_5 directory: /memobase/test_record_set_5
kafka: kafka:
producer: producer:
bootstrap.servers: localhost:12345 bootstrap.servers: localhost:12345
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment