Commit ce59c8e0 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Initial commit

parents
Pipeline #9750 failed with stages
in 1 minute and 15 seconds
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
class Producer(props: Properties, private val topic: String) : Closeable {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
fun sendReport(report: Report) {
instance.send(ProducerRecord(reportingTopic, report.id, Klaxon().toJsonString(report)))
}
fun sendJobReport(report: Report, topic: String) {
instance.send(ProducerRecord(topic, report.id, Klaxon().toJsonString(report)))
}
override fun close() {
instance.close()
}
}
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class Report(
val id: String,
val status: String,
val message: String
)
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.io.File
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class Service(fileName: String = "app.yml") {
private val log: Logger = LogManager.getLogger("FedoraIngestService")
val settings =
SettingsLoader(
// TODO: Add app configs
listOf("directory"),
fileName,
useProducerConfig = true,
useConsumerConfig = true,
readSftpSettings = true
)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val sftpClient = SftpClient(settings.sftpSettings)
fun run() {
// TODO: Implement logic
}
}
/*
* text-file-validation
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
id: ${JOB_ID:?env}
sftp:
host: ${SFTP_HOST:?env}
port: ${SFTP_PORT:?env}
user: ${SFTP_USER:?env}
password: ${SFTP_PASSWORD:?env}
kafka:
consumer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?env}
client.id: ${CLIENT_ID:?env}
group.id: ${CLIENT_ID:?env}
topic:
out: ${TOPIC_IN:?env}
process: ${TOPIC_PROCESS:?env}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Fedora Ingest Service
~ Copyright (C) 2020 Project Memobase
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License as published by
~ the Free Software Foundation, either version 3 of the License, or
~ (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
-->
<Configuration status="info" name="skeleton-app" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5level] [%c{1}] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class TestParams(
val configFile: String,
val expectedKey: String,
val expectedValue: String,
val expectedReportValue: String,
val expectedProcessReport: Report
)
/*
* sftp-reader
* Copyright (C) 2019 Memobase
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.FileInputStream
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedKafkaExtension
import org.memobase.extensions.EmbeddedSftpServer
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
private val log = LogManager.getLogger("LocalTestsLogger")
private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
private val adminClient =
AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))
init {
sftpServer.createDirectories(
"/memobase/test_institution_1/test_record_set_1/",
"/memobase/test_institution_2/test_record_set_2/"
)
sftpServer.putFile(
"/memobase/test_institution_1/test_record_set_1/brandt.csv",
FileInputStream("src/test/resources/data/brandt.csv")
)
sftpServer.putFile(
"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv",
FileInputStream("src/test/resources/data/bauGAZH_metadaten.csv")
)
sftpServer.putFile(
"/memobase/test_institution_3/test_record_set_3/invalid.csv",
FileInputStream("src/test/resources/data/invalid.csv")
)
sftpServer.putFile(
"/memobase/test_institution_4/test_record_set_4/file.txt",
FileInputStream("src/test/resources/data/file.txt")
)
sftpServer.putFile(
"/memobase/test_institution_5/test_record_set_5/file.xlsx",
FileInputStream("src/test/resources/data/20190906_Brandt_Metadaten.xlsx")
)
}
// TODO: Replace with producer
private val consumer: KafkaConsumer<String, String>
init {
val props = Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test-group-1")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumer = KafkaConsumer(props)
consumer.subscribe(listOf("sftp-reader-p1-j1", "sftp-reader-p1-j1-reporting", "p1-reporting"))
}
// TODO: Change to correct test
@ParameterizedTest
@MethodSource("directoryReaderTests")
fun `read valid csv from sftp`(params: TestParams) {
val service = Service(params.configFile)
service.run()
val topic = service.settings.outputTopic
val reportingTopic = service.settings.outputTopic + "-reporting"
val processReportingTopic = service.settings.processReportTopic
val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
var result = consumer.poll(Duration.ofMillis(10))
while (totalConsumerRecords.size != 3) {
if (result.count() > 0) {
totalConsumerRecords.addAll(result.asIterable())
}
log.error(result.count())
result = consumer.poll(Duration.ofMillis(10))
}
assertThat(totalConsumerRecords.find { value -> value.topic() == topic })
.describedAs("Message Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
assertThat(totalConsumerRecords.find { value -> value.topic() == reportingTopic })
.describedAs("Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedReportValue)
assertThat(totalConsumerRecords.find { value -> value.topic() == processReportingTopic })
.describedAs("Process Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
.hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
}
private fun directoryReaderTests() = Stream.of(
TestParams(
"test1.yml",
expectedKey = "brandt.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_1/test_record_set_1/brandt.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "brandt.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv with format CSV."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
),
TestParams(
"test2.yml",
expectedKey = "bauGAZH_metadaten.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "bauGAZH_metadaten.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv with format CSV."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
),
TestParams(
"test3.yml",
expectedKey = "invalid.csv",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_3/test_record_set_3/invalid.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "invalid.csv",
status = "FAILURE",
message = "CSV ERROR: Fields num seems to be 5 on each row, but on 2th csv row, fields num is 7. for file /memobase/test_institution_3/test_record_set_3/invalid.csv."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "FAILURE",
message = "Failed to validate 1 of 1 files."
)
),
TestParams(
"test4.yml",
expectedKey = "file.txt",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_4/test_record_set_4/file.txt\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "file.txt",
status = "FAILURE",
message = "File Extension Error: Not a valid file extension: file.txt."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "FAILURE",
message = "Failed to validate 1 of 1 files."
)
),
TestParams(
"test5.yml",
expectedKey = "file.xlsx",
expectedValue = "{\"format\" : \"XLSX\", \"path\" : \"/memobase/test_institution_5/test_record_set_5/file.xlsx\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "file.xlsx",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_5/test_record_set_5/file.xlsx with format XLSX."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
)
)
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Fedora Ingest Service
~ Copyright (C) 2020 Project Memobase
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License as published by
~ the Free Software Foundation, either version 3 of the License, or
~ (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
-->
<Configuration status="info" name="skeleton-app" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5level] [%c{1}] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment