Commit f0db098f authored by Jonas Waeber's avatar Jonas Waeber
Browse files

refactor reporting

add data class for messages
update tests
parent f4746db7
Pipeline #8503 passed with stages
in 5 minutes and 56 seconds
......@@ -41,13 +41,9 @@ class App {
val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }
for (file in files) {
val report = validator.validate(file)
when (report.status) {
"valid" -> producer.sendMessage(file.name, file.path, report.format)
else ->
producer.sendMessage(file.name, "ERROR", report.format)
}
producer.sendReport(report)
val validationResult = validator.validate(file)
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
}
}
}
......
......@@ -21,18 +21,15 @@ package org.memobase
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.util.MalformedCSVException
import java.io.File
import org.apache.logging.log4j.LogManager
import org.memobase.sftp.SftpClient
class FileValidation(private val sftp: SftpClient) {
private val log = LogManager.getLogger("FileValidation")
private val supportedExtensions = mapOf(
Pair("csv", "CSV"),
Pair("tsv", "TSV")
Pair("csv", "CSV"),
Pair("tsv", "TSV")
)
fun validate(file: File): Report {
fun validate(file: File): Pair<Message, Report> {
when (val format = validateExtension(file)) {
"CSV", "TSV" -> {
sftp.open(file).use {
......@@ -45,26 +42,32 @@ class FileValidation(private val sftp: SftpClient) {
escapeChar = '\\'
}.readAll(stream)
} catch (ex: MalformedCSVException) {
return Report(
id = file.name,
status = "invalid",
path = file.path,
format = format,
error = ex.localizedMessage
)
return Pair(
Message(format, file.path),
Report(
id = file.name,
status = "FAILURE",
message = ex.localizedMessage
))
}
return Report(id = file.name, status = "valid", path = file.path, format = format, error = "")
return Pair(
Message(format, file.path),
Report(
id = file.name,
status = "SUCCESS",
message = "Validated file at path ${file.path}."))
}
}
else -> return Report(
id = file.name,
status = "invalid",
path = file.path,
format = format,
error = "File Extension Error: Not a valid file extension: ${file.name}."
)
else -> return Pair(
Message("ERROR", file.path),
Report(
id = file.name,
status = "FAILURE",
message = "File Extension Error: Not a valid file extension: ${file.name}."
))
}
}
private fun validateExtension(file: File): String {
return supportedExtensions.getOrDefault(file.extension, "INVALID")
}
......
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class Message(
val format: String,
val path: String
)
......@@ -17,25 +17,18 @@
*/
package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.logging.log4j.LogManager
class Producer(props: Properties, private val topic: String) : Closeable {
private val log = LogManager.getLogger("MessageProducer")
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
private var count = 0
fun sendMessage(key: String, message: String, format: String) {
log.info("Sending message $message to $topic.")
instance.send(ProducerRecord(topic, key, JsonObject(mapOf(Pair("path", message), Pair("format", format))).toJsonString()))
count += 1
fun sendMessage(key: String, message: Message) {
instance.send(ProducerRecord(topic, key, Klaxon().toJsonString(message)))
}
fun sendReport(report: Report) {
......@@ -43,8 +36,6 @@ class Producer(props: Properties, private val topic: String) : Closeable {
}
override fun close() {
instance.flush()
// TODO: Add report for messages sent count.
instance.close()
}
}
......@@ -21,7 +21,5 @@ package org.memobase
data class Report(
val id: String,
val status: String,
val path: String,
val format: String,
val error: String
val message: String
)
......@@ -21,5 +21,6 @@ package org.memobase
data class TestParams(
val configFile: String,
val expectedKey: String,
val expectedValue: String
val expectedValue: String,
val expectedReportValue: String
)
......@@ -17,14 +17,19 @@
*/
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.File
import java.io.FileInputStream
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
......@@ -38,16 +43,20 @@ import org.memobase.sftp.SftpClient
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
private val log = LogManager.getLogger("LocalTestsLogger")
private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
private val adminClient =
AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))
init {
sftpServer.createDirectories(
"/memobase/test_institution_1/test_record_set_1/",
"/memobase/test_institution_2/test_record_set_2/"
"/memobase/test_institution_1/test_record_set_1/",
"/memobase/test_institution_2/test_record_set_2/"
)
sftpServer.putFile("/memobase/test_institution_1/test_record_set_1/brandt.csv", "src/test/resources/data/brandt.csv")
sftpServer.putFile("/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv", "src/test/resources/data/bauGAZH_metadaten.csv")
sftpServer.putFile("/memobase/test_institution_1/test_record_set_1/brandt.csv", FileInputStream("src/test/resources/data/brandt.csv"))
sftpServer.putFile("/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv", FileInputStream("src/test/resources/data/bauGAZH_metadaten.csv"))
}
private val consumer: KafkaConsumer<String, String>
......@@ -67,10 +76,10 @@ class Tests {
@MethodSource("directoryReaderTests")
fun `read valid csv from sftp`(params: TestParams) {
val settings = SettingsLoader(
listOf("directory"),
fileName = params.configFile,
useProducerConfig = true,
readSftpSettings = true
listOf("directory"),
fileName = params.configFile,
useProducerConfig = true,
readSftpSettings = true
)
Producer(settings.kafkaProducerSettings, settings.outputTopic).use { producer ->
......@@ -80,38 +89,61 @@ class Tests {
val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }
for (file in files) {
val report = validator.validate(file)
when (report.status) {
"valid" -> producer.sendMessage(file.name, file.path, report.format)
else ->
producer.sendMessage(file.name, "ERROR", report.format)
}
producer.sendReport(report)
val validationResult = validator.validate(file)
producer.sendMessage(validationResult.second.id, validationResult.first)
producer.sendReport(validationResult.second)
}
}
}
consumer.assign(listOf(TopicPartition(settings.outputTopic, 0)))
val result = consumer.poll(Duration.ofMillis(1000))
assertThat(result.count())
.isEqualTo(1)
var result = consumer.poll(Duration.ofMillis(10))
while (result.count() == 0) {
result = consumer.poll(Duration.ofMillis(10))
}
consumer.assign(listOf(TopicPartition(settings.outputTopic + "-reporting", 0)))
var resultReport = consumer.poll(Duration.ofMillis(10))
while (resultReport.count() == 0) {
resultReport = consumer.poll(Duration.ofMillis(10))
}
assertThat(result.records(settings.outputTopic))
.first()
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
.describedAs("Record Results")
.hasSize(1)
.first()
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
assertThat(resultReport.records(settings.outputTopic + "-reporting"))
.describedAs("Report Results")
.hasSize(1)
.first()
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedReportValue)
// cleanup inside of class because there is no way to access topics outside of this function.
adminClient.deleteTopics(listOf(settings.outputTopic, settings.outputTopic + "-reporting"))
}
private fun directoryReaderTests() = Stream.of(
TestParams(
"test1.yml",
expectedKey = "data/brandt.csv",
expectedValue = "{\"path\":\"/memobase/test_institution_1/test_record_set_1/brandt.csv\",\"format\":\"CSV\"}"
),
TestParams(
"test2.yml",
expectedKey = "data/bauGAZH_metadaten.csv",
expectedValue = "{\"path\":\"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\",\"format\":\"CSV\"}"
)
)
TestParams(
"test1.yml",
expectedKey = "brandt.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_1/test_record_set_1/brandt.csv\"}",
expectedReportValue = Klaxon().toJsonString(Report(
id = "brandt.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv."))
),
TestParams(
"test2.yml",
expectedKey = "bauGAZH_metadaten.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\"}",
expectedReportValue = Klaxon().toJsonString(Report(
id = "bauGAZH_metadaten.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv."))
)
)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment