Commit a73981d0 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update service utility dependency

Refactor main entry point.
Fix csv error report.
Improve success report message.
Add job report.
Update default config.
Update tests.
parent ad7ce175
Pipeline #8706 passed with stages
in 5 minutes and 52 seconds
......@@ -41,7 +41,7 @@ dependencies {
//implementation "org.apache.kafka:kafka-streams:${kafkaV}"
compile group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation 'org.memobase:memobase-service-utilities:1.1.4'
implementation 'org.memobase:memobase-service-utilities:1.2.0'
// SFTP Client
implementation 'com.hierynomus:sshj:0.27.0'
......
......@@ -18,35 +18,16 @@
package org.memobase
import java.io.File
import kotlin.system.exitProcess
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class App {
companion object {
private val log = LogManager.getLogger("SftpReader")
@JvmStatic fun main(args: Array<String>) {
try {
val settings = SettingsLoader(
listOf("directory"),
useProducerConfig = true,
readSftpSettings = true
)
Producer(settings.kafkaProducerSettings, settings.outputTopic).use { producer ->
SftpClient(settings.sftpSettings).use { sftp ->
val validator = FileValidation(sftp)
val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }
for (file in files) {
val validationResult = validator.validate(file)
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
}
}
}
val service = Service()
service.run()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
......
......@@ -43,11 +43,11 @@ class FileValidation(private val sftp: SftpClient) {
}.readAll(stream)
} catch (ex: MalformedCSVException) {
return Pair(
Message(format, file.path),
Message("ERROR", file.path),
Report(
id = file.name,
status = "FAILURE",
message = ex.localizedMessage
message = "$format ERROR: " + ex.localizedMessage
))
}
return Pair(
......@@ -55,7 +55,7 @@ class FileValidation(private val sftp: SftpClient) {
Report(
id = file.name,
status = "SUCCESS",
message = "Validated file at path ${file.path}."))
message = "Validated file at path ${file.path} with format $format."))
}
}
else -> return Pair(
......
......@@ -35,6 +35,10 @@ class Producer(props: Properties, private val topic: String) : Closeable {
instance.send(ProducerRecord(reportingTopic, report.id, Klaxon().toJsonString(report)))
}
fun sendJobReport(report: Report, topic: String) {
instance.send(ProducerRecord(topic, report.id, Klaxon().toJsonString(report)))
}
override fun close() {
instance.close()
}
......
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.io.File
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class Service(fileName: String = "app.yml") {
val settings =
SettingsLoader(
listOf("directory"),
fileName,
useProducerConfig = true,
readSftpSettings = true
)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val sftpClient = SftpClient(settings.sftpSettings)
fun run() {
producer.use { producer ->
sftpClient.use { sftp ->
val validator = FileValidation(sftp)
val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }
val reports = mutableListOf<Report>()
for (file in files) {
val validationResult = validator.validate(file)
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
reports.add(validationResult.second)
}
val failures = reports.count { report -> report.status == "FAILURE" }
if (failures > 0) {
producer.sendJobReport(
Report(settings.id, status = "FAILURE", message = "Failed to validate $failures of ${reports.size} files."),
settings.processReportTopic
)
} else {
producer.sendJobReport(
Report(settings.id, status = "SUCCESS", message = "Successfully validated ${reports.size} files."),
settings.processReportTopic
)
}
}
}
}
}
id: ${JOB_ID:?env}
sftp:
host: ${SFTP_HOST:?env}
port: ${SFTP_PORT:?env}
......@@ -7,8 +8,8 @@ app:
directory: ${APP_DIRECTORY:?env}
kafka:
producer:
bootstrap.servers: ${PROD_KAFKA_SERVERS:?env}
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?env}
client.id: ${CLIENT_ID:?env}
value.serializer: "org.apache.kafka.common.serialization.StringSerializer"
topic:
out: ${CLIENT_ID:?env}
\ No newline at end of file
out: ${TOPIC_IN:?env}
process: ${TOPIC_PROCESS:?env}
\ No newline at end of file
......@@ -22,5 +22,6 @@ data class TestParams(
val configFile: String,
val expectedKey: String,
val expectedValue: String,
val expectedReportValue: String
val expectedReportValue: String,
val expectedProcessReport: Report
)
......@@ -18,7 +18,6 @@
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.File
import java.io.FileInputStream
import java.time.Duration
import java.util.Properties
......@@ -37,8 +36,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedKafkaExtension
import org.memobase.extensions.EmbeddedSftpServer
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
......@@ -57,6 +54,8 @@ class Tests {
)
sftpServer.putFile("/memobase/test_institution_1/test_record_set_1/brandt.csv", FileInputStream("src/test/resources/data/brandt.csv"))
sftpServer.putFile("/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv", FileInputStream("src/test/resources/data/bauGAZH_metadaten.csv"))
sftpServer.putFile("/memobase/test_institution_3/test_record_set_3/invalid.csv", FileInputStream("src/test/resources/data/invalid.csv"))
sftpServer.putFile("/memobase/test_institution_4/test_record_set_4/file.txt", FileInputStream("src/test/resources/data/file.txt"))
}
private val consumer: KafkaConsumer<String, String>
......@@ -75,55 +74,54 @@ class Tests {
@ParameterizedTest
@MethodSource("directoryReaderTests")
fun `read valid csv from sftp`(params: TestParams) {
val settings = SettingsLoader(
listOf("directory"),
fileName = params.configFile,
useProducerConfig = true,
readSftpSettings = true
)
Producer(settings.kafkaProducerSettings, settings.outputTopic).use { producer ->
SftpClient(settings.sftpSettings).use { sftp ->
val validator = FileValidation(sftp)
val service = Service(params.configFile)
service.run()
val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }
for (file in files) {
val validationResult = validator.validate(file)
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
}
}
}
val topic = service.settings.outputTopic
val reportingTopic = service.settings.outputTopic + "-reporting"
val processReportingTopic = service.settings.processReportTopic
consumer.assign(listOf(TopicPartition(settings.outputTopic, 0)))
consumer.assign(listOf(TopicPartition(topic, 0)))
var result = consumer.poll(Duration.ofMillis(10))
while (result.count() == 0) {
result = consumer.poll(Duration.ofMillis(10))
}
consumer.assign(listOf(TopicPartition(settings.outputTopic + "-reporting", 0)))
consumer.assign(listOf(TopicPartition(reportingTopic, 0)))
var resultReport = consumer.poll(Duration.ofMillis(10))
while (resultReport.count() == 0) {
resultReport = consumer.poll(Duration.ofMillis(10))
}
assertThat(result.records(settings.outputTopic))
consumer.assign(listOf(TopicPartition(processReportingTopic, 0)))
var processReportResult = consumer.poll(Duration.ofMillis(10))
while (processReportResult.count() == 0) {
processReportResult = consumer.poll(Duration.ofMillis(10))
}
assertThat(result.records(topic))
.describedAs("Record Results")
.hasSize(1)
.first()
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
assertThat(resultReport.records(settings.outputTopic + "-reporting"))
assertThat(resultReport.records(reportingTopic))
.describedAs("Report Results")
.hasSize(1)
.first()
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedReportValue)
assertThat(processReportResult.records(processReportingTopic))
.describedAs("Report Process Results")
.hasSize(1)
.first()
.hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
.hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
// cleanup inside of class because there is no way to access topics outside of this function.
adminClient.deleteTopics(listOf(settings.outputTopic, settings.outputTopic + "-reporting"))
adminClient.deleteTopics(listOf(topic, reportingTopic, processReportingTopic))
}
private fun directoryReaderTests() = Stream.of(
......@@ -134,7 +132,12 @@ class Tests {
expectedReportValue = Klaxon().toJsonString(Report(
id = "brandt.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv."))
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv with format CSV.")),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
),
TestParams(
"test2.yml",
......@@ -143,7 +146,40 @@ class Tests {
expectedReportValue = Klaxon().toJsonString(Report(
id = "bauGAZH_metadaten.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv."))
message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv with format CSV.")),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
),
TestParams(
"test3.yml",
expectedKey = "invalid.csv",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_3/test_record_set_3/invalid.csv\"}",
expectedReportValue = Klaxon().toJsonString(Report(
id = "invalid.csv",
status = "FAILURE",
message = "CSV ERROR: Fields num seems to be 5 on each row, but on 2th csv row, fields num is 7.")),
expectedProcessReport = Report(
id = "jobXYZ",
status = "FAILURE",
message = "Failed to validate 1 of 1 files."
)
),
TestParams(
"test4.yml",
expectedKey = "file.txt",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_4/test_record_set_4/file.txt\"}",
expectedReportValue = Klaxon().toJsonString(Report(
id = "file.txt",
status = "FAILURE",
message = "File Extension Error: Not a valid file extension: file.txt.")),
expectedProcessReport = Report(
id = "jobXYZ",
status = "FAILURE",
message = "Failed to validate 1 of 1 files."
)
)
)
}
,saudia,asdjicx,asjdij,
,asjd,si,,,,
\ No newline at end of file
id: jobXYZ
sftp:
host: localhost
port: 22000
......@@ -9,6 +10,6 @@ kafka:
producer:
bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1
value.serializer: "org.apache.kafka.common.serialization.StringSerializer"
topic:
out: sftp-reader-p1-j1
\ No newline at end of file
out: sftp-reader-p1-j1
process: p1-reporting
\ No newline at end of file
id: jobXYZ
sftp:
host: localhost
port: 22000
......@@ -9,6 +10,6 @@ kafka:
producer:
bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1
value.serializer: "org.apache.kafka.common.serialization.StringSerializer"
topic:
out: sftp-reader-p1-j1
\ No newline at end of file
out: sftp-reader-p1-j1
process: p1-reporting
\ No newline at end of file
id: jobXYZ
sftp:
host: localhost
port: 22000
user: user
password: password
app:
directory: /memobase/test_institution_3/test_record_set_3
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1
topic:
out: sftp-reader-p1-j1
process: p1-reporting
\ No newline at end of file
id: jobXYZ
sftp:
host: localhost
port: 22000
user: user
password: password
app:
directory: /memobase/test_institution_4/test_record_set_4
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1
topic:
out: sftp-reader-p1-j1
process: p1-reporting
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment