Commit 69b2024c authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

Fix formatting, add missing test resources

parent 0aa693b4
Pipeline #11489 failed with stages
in 1 minute and 58 seconds
......@@ -53,6 +53,7 @@ dependencies {
implementation "org.jetbrains.kotlin:kotlin-reflect:1.3.71"
// TESTS
testImplementation("org.junit.jupiter:junit-jupiter:5.6.0")
testImplementation 'org.assertj:assertj-core:3.15.0'
testImplementation 'org.mockito:mockito-core:2.7.22'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.6.0'
......
......@@ -26,7 +26,7 @@ class App {
private val log = LogManager.getLogger("FedorIngestApp")
@JvmStatic fun main(args: Array<String>) {
try {
val service = Service.create()
val service = Service()
service.run()
} catch (ex: Exception) {
ex.printStackTrace()
......
......@@ -17,12 +17,11 @@
*/
package org.memobase
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.io.Closeable
import java.time.Duration
import java.util.*
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
class Consumer(props: Properties, private val topic: String) : Closeable {
private val instance = KafkaConsumer<String, String>(props)
......@@ -39,4 +38,4 @@ class Consumer(props: Properties, private val topic: String) : Closeable {
override fun close() {
instance.close()
}
}
\ No newline at end of file
}
package org.memobase
// import java.io.File
// import java.io.InputStream
import java.net.URI
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.sftp.SftpClient
import java.io.File
import java.io.InputStream
import java.net.URI
class Ingester (
private val sftpClient: SftpClient,
private val fedoraClient: FedoraClient) {
class Ingester(private val sftpClient: SftpClient, private val fedoraClient: FedoraClient) {
private val log = LogManager.getLogger("IngestService")
@Throws(FcrepoOperationFailedException::class)
fun ingest(id: String) {
fun ingest(id: String, content: String) {
log.info("Ingest record $id.")
log.info("Content: $content")
// fedoraClient.createOrUpdateRdfResource()
// if (rdf.hasFileReference()) {
// fedoraClient.createOrUpdateBinaryResource()
// }
throw FcrepoOperationFailedException(
URI("http://mb-fed1.memobase.unibas.ch:8080/fcrepo/rest"), 501, "Not Implemented."
URI("http://mb-fed1.memobase.unibas.ch:8080/fcrepo/rest"), 501, "Not Implemented."
)
}
......@@ -34,4 +32,4 @@ class Ingester (
// private fun openSftpFile(file: File): InputStream {
// sftpClient.open(file)
// }
}
\ No newline at end of file
}
......@@ -18,22 +18,16 @@
package org.memobase
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl
// import org.bouncycastle.cms.RecipientId.password
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
import org.memobase.fedora.*
import java.io.File
import java.net.URI
import java.util.*
// import java.util.*
class Service(fileName: String = "app.yml") {
......@@ -73,7 +67,6 @@ class Service(fileName: String = "app.yml") {
}
}
fun run() {
consumer.use { consumer ->
producer.use { producer ->
......
......@@ -40,5 +40,3 @@ object ReportMessages {
return "Failed to ingest resource $id."
}
}
......@@ -18,8 +18,10 @@
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.io.FileInputStream
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
......@@ -27,6 +29,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
......@@ -35,11 +39,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.testing.EmbeddedKafkaExtension
import org.memobase.testing.EmbeddedSftpServer
import java.io.FileInputStream
import java.time.Duration
import java.util.*
import java.util.stream.Stream
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
......@@ -57,7 +56,7 @@ class ServiceTest {
"/memobase/test_institution_1/test_record_set_1/"
)
sftpServer.putFile(
"/memobase/test_institution_1/test_record_set_1/BAZ-MEI_77252-1.txt",
"/memobase/test_institution_1/test_record_set_1/binary.txt",
FileInputStream("src/test/resources/data/binary.txt")
)
}
......@@ -88,7 +87,6 @@ class ServiceTest {
reportConsumer.subscribe(listOf("ingest-p1-j1", "ingest-p1-j1-reporting", "p1-reporting"))
}
// TODO: Change to correct test
@ParameterizedTest
@MethodSource("kafkaTests")
......@@ -122,20 +120,20 @@ class ServiceTest {
TestParams(
"test1.yml",
expectedJobReports = listOf(
Klaxon().toJsonString(
Report(
id = "MEI_49884",
status = "SUCCESS",
message = "Ingested resource MEI_49884."
)
),
Klaxon().toJsonString(
Klaxon().toJsonString(
Report(
id = "BAZ-MEI_49884-0",
id = "MEI_49884",
status = "SUCCESS",
message = "Ingested resource BAZ-MEI_49884-0."
message = "Ingested resource MEI_49884."
)
),
Klaxon().toJsonString(
Report(
id = "BAZ-MEI_49884-0",
status = "SUCCESS",
message = "Ingested resource BAZ-MEI_49884-0."
)
)
)
),
expectedProcessReport = Report(
id = "jobXYZ",
......
/*
* sftp-reader
* Copyright (C) 2019 Memobase
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedKafkaExtension
import org.memobase.extensions.EmbeddedSftpServer
import org.memobase.settings.SettingsLoader
import java.io.FileInputStream
import java.time.Duration
import java.util.*
import java.util.stream.Stream
@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
private val log = LogManager.getLogger("LocalTestsLogger")
private val sftpServer = EmbeddedSftpServer(22000, "user", "password")
private val adminClient =
AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))
init {
sftpServer.createDirectories(
"/memobase/test_institution_1/test_record_set_1/",
"/memobase/test_institution_2/test_record_set_2/"
)
sftpServer.putFile(
"/memobase/test_institution_1/test_record_set_1/brandt.csv",
FileInputStream("src/test/resources/data/brandt.csv")
)
sftpServer.putFile(
"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv",
FileInputStream("src/test/resources/data/bauGAZH_metadaten.csv")
)
sftpServer.putFile(
"/memobase/test_institution_3/test_record_set_3/invalid.csv",
FileInputStream("src/test/resources/data/invalid.csv")
)
sftpServer.putFile(
"/memobase/test_institution_4/test_record_set_4/file.txt",
FileInputStream("src/test/resources/data/file.txt")
)
sftpServer.putFile(
"/memobase/test_institution_5/test_record_set_5/file.xlsx",
FileInputStream("src/test/resources/data/20190906_Brandt_Metadaten.xlsx")
)
}
private val producer: KafkaProducer<String, String>
init {
val props = Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "test-group-1")
// FIXME:
// props.setProperty(ProducerConfig.GROUP_ID_CONFIG, "test-group-1")
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.qualifiedName)
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.qualifiedName)
// FIXME:
// props.setProperty(ProducerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
producer = KafkaProducer(props)
}
// TODO: Change to correct test
// @ParameterizedTest
// @MethodSource("directoryReaderTests")
// fun `read valid csv from sftp`(params: TestParams) {
// val service = Service(params.configFile)
// service.run()
//
// val topic = service.settings.outputTopic
// val reportingTopic = service.settings.outputTopic + "-reporting"
// val processReportingTopic = service.settings.processReportTopic
//
// val settings = SettingsLoader(
// listOf(
// "institutionId",
// "recordSetId",
// "configs"
// ),
// fileName = "kafkaTest${params.count}.yml",
// useStreamsConfig = true
// )
// val testDriver = TopologyTestDriver(KafkaTopology(settings).build(), settings.kafkaStreamsSettings)
// val factory = ConsumerRecordFactory(
// StringSerializer(), StringSerializer()
// )
//
// // TODO: Iterate over mesages from test resources and send them to kafka
// testDriver.pipeInput(
// factory.create(
// settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json")
// )
// )
//
// val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
//
// var result = consumer.poll(Duration.ofMillis(10))
// while (totalConsumerRecords.size != 3) {
// if (result.count() > 0) {
// totalConsumerRecords.addAll(result.asIterable())
// }
// log.error(result.count())
// result = consumer.poll(Duration.ofMillis(10))
// }
//
// assertThat(totalConsumerRecords.find { value -> value.topic() == topic })
// .describedAs("Message Test")
// .hasFieldOrPropertyWithValue("key", params.expectedKey)
// .hasFieldOrPropertyWithValue("value", params.expectedValue)
//
// assertThat(totalConsumerRecords.find { value -> value.topic() == reportingTopic })
// .describedAs("Report Test")
// .hasFieldOrPropertyWithValue("key", params.expectedKey)
// .hasFieldOrPropertyWithValue("value", params.expectedReportValue)
//
// assertThat(totalConsumerRecords.find { value -> value.topic() == processReportingTopic })
// .describedAs("Process Report Test")
// .hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
// .hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
// }
private fun kafkaTests() = Stream.of(
TestParams(
"test1.yml",
expectedKey = "brandt.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_1/test_record_set_1/brandt.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "brandt.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv with format CSV."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
),
TestParams(
"test2.yml",
expectedKey = "bauGAZH_metadaten.csv",
expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "bauGAZH_metadaten.csv",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv with format CSV."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
),
TestParams(
"test3.yml",
expectedKey = "invalid.csv",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_3/test_record_set_3/invalid.csv\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "invalid.csv",
status = "FAILURE",
message = "CSV ERROR: Fields num seems to be 5 on each row, but on 2th csv row, fields num is 7. for file /memobase/test_institution_3/test_record_set_3/invalid.csv."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "FAILURE",
message = "Failed to validate 1 of 1 files."
)
),
TestParams(
"test4.yml",
expectedKey = "file.txt",
expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/memobase/test_institution_4/test_record_set_4/file.txt\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "file.txt",
status = "FAILURE",
message = "File Extension Error: Not a valid file extension: file.txt."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "FAILURE",
message = "Failed to validate 1 of 1 files."
)
),
TestParams(
"test5.yml",
expectedKey = "file.xlsx",
expectedValue = "{\"format\" : \"XLSX\", \"path\" : \"/memobase/test_institution_5/test_record_set_5/file.xlsx\"}",
expectedReportValue = Klaxon().toJsonString(
Report(
id = "file.xlsx",
status = "SUCCESS",
message = "Validated file at path /memobase/test_institution_5/test_record_set_5/file.xlsx with format XLSX."
)
),
expectedProcessReport = Report(
id = "jobXYZ",
status = "SUCCESS",
message = "Successfully validated 1 files."
)
)
)
}
id: jobXYZ
sftp:
host: localhost
port: 22000
user: user
password: password
app:
directory: /memobase/test_record_set_1
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: sftp-reader-p1-j1
topic:
out: sftp-reader-p1-j1
process: p1-reporting
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment