Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit 566ba3fb authored by Jonas Waeber's avatar Jonas Waeber
Browse files

updated tests

parent 80f2cd26
......@@ -28,7 +28,7 @@ class App {
private val log = LogManager.getLogger("RecordParser")
@JvmStatic fun main(args: Array<String>) {
try {
val settings = SettingsLoader()
val settings = SettingsLoader("app.yml")
val topology = KafkaTopology(settings).build()
val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
......
......@@ -18,6 +18,7 @@
package org.memobase
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.dsl.csvWriter
......@@ -47,45 +48,57 @@ class KafkaTopology(private val settings: SettingsLoader
val source = builder
.stream<String, String>(settings.inputTopic)
.filter { _, value -> value.contains("ERROR") }
.mapValues { _, value -> Klaxon().parseJsonObject(StringReader(value)) }
.flatMapValues { _, value ->
val resultMessages = mutableListOf<KeyValue<String, String>>()
sftpClient.open(File(value["path"] as String)).use {
val reader = csvReader().readAll(it.RemoteFileInputStream())
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count < csvHeaderCount) {
if (csvUseHeaderProperties) {
if (count == csvUsedHeaderIndex) {
headerProperties = line
}
}
}
val output = ByteArrayOutputStream()
csvWriter {
this.charset = Charsets.UTF_8.name()
this.delimiter = ','
this.lineTerminator = "\n"
this.nullCode = ""
}.open(output) {
if (headerProperties.isNotEmpty()) {
writeRow(headerProperties)
}
writeRow(line)
}
val identifier = line[csvIdentifierIndex]
val result = KeyValue(identifier, output.toString(Charset.defaultCharset()))
output.reset()
resultMessages.add(result)
} }
return@flatMapValues resultMessages
}
.filter { _, value -> filer(value) }
.mapValues { _, value -> mapValues(value) }
.flatMap { _, value -> parser(value) }
source.to(settings.outputTopic)
return builder.build()
}
private fun filer(value: String): Boolean {
return !value.contains("ERROR")
}
private fun mapValues(value: String): JsonObject {
return Klaxon().parseJsonObject(StringReader(value))
}
private fun parser(value: JsonObject): List<KeyValue<String, String>> {
val resultMessages = mutableListOf<KeyValue<String, String>>()
sftpClient.open(File(value["path"] as String)).use {
val reader = csvReader().readAll(it.RemoteFileInputStream())
var headerProperties = emptyList<String>()
var count = 0
for (line in reader) {
count += 1
if (count <= csvHeaderCount) {
if (csvUseHeaderProperties) {
if (count == csvUsedHeaderIndex) {
headerProperties = line
}
}
continue
}
val output = ByteArrayOutputStream()
csvWriter {
this.charset = Charsets.UTF_8.name()
this.delimiter = ','
this.lineTerminator = "\n"
this.nullCode = ""
}.open(output) {
if (headerProperties.isNotEmpty()) {
writeRow(headerProperties)
}
writeRow(line)
}
val identifier = line[csvIdentifierIndex]
val result = KeyValue(identifier, output.toString(Charset.defaultCharset()).trim())
output.reset()
resultMessages.add(result)
} }
return resultMessages
}
}
......@@ -28,15 +28,18 @@ class Producer(props: Properties, private val topic: String) : Closeable {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
private var count = 0
private fun send(topic: String, key: String, message: String) {
instance.send(ProducerRecord(topic, key, message))
}
fun sendMessage(key: String, message: String, format: String) {
instance.send(ProducerRecord(topic, key, JsonObject(mapOf(Pair("path", message), Pair("format", format))).toJsonString()))
count += 1
send(topic, key, JsonObject(mapOf(Pair("path", message), Pair("format", format))).toJsonString())
}
fun sendReport(report: Report) {
instance.send(ProducerRecord(reportingTopic, report.id, Klaxon().toJsonString(report)))
send(reportingTopic, report.id, Klaxon().toJsonString(report))
}
override fun close() {
......
......@@ -31,19 +31,19 @@ import org.snakeyaml.engine.v2.api.Load
import org.snakeyaml.engine.v2.api.LoadSettings
import org.snakeyaml.engine.v2.exceptions.MissingEnvironmentVariableException
class SettingsLoader {
class SettingsLoader(private val fileName: String) {
private val log = LogManager.getLogger("SettingsLoader")
private fun loadYaml(): Any {
val settings = LoadSettings.builder().setEnvConfig(Optional.of(CustomEnvConfig())).build()
val load = Load(settings)
try {
val file = File("/configs/app.yml")
val file = File("/configs/$fileName")
return if (file.isFile) {
load.loadFromInputStream(FileInputStream(file))
} else {
log.warn("Loading default properties in app.yml from classpath!")
load.loadFromInputStream(ClassLoader.getSystemResourceAsStream("app.yml"))
load.loadFromInputStream(ClassLoader.getSystemResourceAsStream(fileName))
}
} catch (ex: MissingEnvironmentVariableException) {
throw MissingSettingException("env", "${ex.localizedMessage}")
......
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.junit.jupiter.api.extension.BeforeAllCallback
import org.junit.jupiter.api.extension.ExtensionContext
class EmbeddedKafkaExtension : BeforeAllCallback {
lateinit var kafka: KafkaEmbedded
private val testTopic: String = "test"
private val port: Int = 12345
override fun beforeAll(context: ExtensionContext?) {
kafka = KafkaEmbedded(port, testTopic)
val wrapper = KafkaWrapper(kafka)
if (context != null) {
context.getStore(ExtensionContext.Namespace.create(EmbeddedKafkaExtension::class.java))
.put("kafka", wrapper)
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
wrapper.close()
}))
}
}
class KafkaWrapper(private val embedded: KafkaEmbedded) : ExtensionContext.Store.CloseableResource {
override fun close() {
embedded.close()
}
}
}
/*
* sftp-reader
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import java.io.Closeable
import java.nio.file.Files
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.admin.RackAwareMode
import kafka.server.KafkaConfig
import kafka.server.KafkaServer
import kafka.utils.TestUtils
import kafka.utils.ZkUtils
import kafka.utils.`ZKStringSerializer$`
import kafka.zk.EmbeddedZookeeper
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.utils.Time
import scala.Option
import scala.collection.JavaConversions
class KafkaEmbedded(port: Int, topic: String) : Closeable {
private val server: KafkaServer
private val zkClient: ZkClient
private val zkServer: EmbeddedZookeeper = EmbeddedZookeeper()
init {
val zkConnect = "127.0.0.1:${zkServer.port()}"
val props = Properties()
props.setProperty("zookeeper.connect", zkConnect)
props.setProperty("broker.id", "0")
props.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString())
props.setProperty("listeners", "PLAINTEXT://localhost:$port")
props.setProperty("offsets.topic.replication.factor", "1")
server = KafkaServer(KafkaConfig(props), Time.SYSTEM, Option.apply("kafka-broker"), JavaConversions.asScalaBuffer(emptyList()))
server.startup()
zkClient = ZkClient(zkConnect, 30000, 30000, `ZKStringSerializer$`.`MODULE$`)
val zkUtils = ZkUtils.apply(zkClient, false)
AdminUtils.createTopic(zkUtils, topic, 1, 1, Properties(), RackAwareMode.`Disabled$`.`MODULE$`)
TestUtils.waitUntilMetadataIsPropagated(JavaConversions.asScalaBuffer(listOf(server)), topic, 0, 5000)
}
override fun close() {
server.shutdown()
server.awaitShutdown()
zkClient.close()
zkServer.shutdown()
}
}
......@@ -27,14 +27,17 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
import java.io.File
import java.lang.IllegalStateException
import java.nio.charset.Charset
import kotlin.test.assertNotNull
import kotlin.test.assertNull
//@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
private val log = LogManager.getLogger("TestLogger")
private val settings = SettingsLoader()
private val testDriver = TopologyTestDriver(KafkaTopology(settings).build(), settings.kafkaStreamsSettings)
private val resourcePath = "src/test/resources/data"
private fun readFile(fileName: String): String {
......@@ -42,25 +45,72 @@ class Tests {
}
@Test
fun `test csv file import`() {
fun `test error file filter`() {
val settingsLoader = SettingsLoader("test1.yml")
val testDriver = TopologyTestDriver(KafkaTopology(settingsLoader).build(), settingsLoader.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer())
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
settings.inputTopic, null, readFile("csv_file_import_1.json")
settingsLoader.inputTopic, "brandt_metadaten.csv", readFile("error_filter_input.json")
)
)
val producerRecord = testDriver.readOutput(
settingsLoader.outputTopic,
StringDeserializer(),
StringDeserializer()
)
assertNull(producerRecord)
}
@Test
fun `test csv file import with header line`() {
val settingsLoader = SettingsLoader("test1.yml")
val testDriver = TopologyTestDriver(KafkaTopology(settingsLoader).build(), settingsLoader.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
settingsLoader.inputTopic, "brandt_metadaten.csv", readFile("csv_file_import_1.json")
)
)
val record: ProducerRecord<String, String> =
testDriver.readOutput(
settings.outputTopic,
settingsLoader.outputTopic,
StringDeserializer(),
StringDeserializer()
)
assertNotNull(record)
assertAll("test csv file import",
{ assertEquals("", record.key()) },
{ assertEquals("AVGR13716", record.key()) },
{ assertEquals(readFile("csv_file_import_1_output.csv"), record.value()) }
)
}
@Test
fun `test csv file import without header line`() {
val settingsLoader = SettingsLoader("test2.yml")
val testDriver = TopologyTestDriver(KafkaTopology(settingsLoader).build(), settingsLoader.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
settingsLoader.inputTopic, "brandt_metadaten.csv", readFile("csv_file_import_1.json")
)
)
val record: ProducerRecord<String, String> =
testDriver.readOutput(
settingsLoader.outputTopic,
StringDeserializer(),
StringDeserializer()
)
assertNotNull(record)
assertAll("test csv file import",
{ assertEquals("AVGR13716", record.key()) },
{ assertEquals(readFile("csv_file_import_2_output.csv"), record.value()) }
)
}
}
{
"path": "./test_institution_1/test_record_set_1/test.csv",
"path": "./test_institution_1/test_record_set_1/brandt_metadaten.csv",
"format": "CSV"
}
\ No newline at end of file
ID Original ID,,Inhalt Haupttitel,Inhalt Beschreibung Bemerkung,Inhalt Beschreibung,Kontext Erstellung,Kontext AutorIn,AutorIn,Inhalt Genre,Inhalt Schlagworte,Kontext Aufnahmeort,Kontext Beschreibung Bemerkung,System Information Streaming,,System Information Dokumenttyp,Technische Informationen Trägerformat (des Originals),Technische Informationen Tonaufnahmeverfahren ,Technische Informationen Film Farbe,Technische Informationen Dauer,Technische Informationen Film Bemerkung
AVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring – Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht – Pferderennen am Obersee – Eiskunstlauf – Pferderennen, diesmal winterlicher – Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna – Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten – Eishockey-Match – Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung","1920, 1920-1929, genaues Datum nicht eruierbar","Brandt, Carl",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 „Ankunft David Zogg“ ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 „Touristen auf dem Tschuggen“,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,"35-mm-Film, Negativ und Positiv, Nitrat",stumm,s/w getönt,0:17:02,"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)"
\ No newline at end of file
AVGR13716,https://www.gr.ch/Exemplare/13716,Wintersport in Arosa,,"Pferderennen am Obersee bei strahlendem Sonnenschein, viel Publikum, Gedränge vor Wettbüro, Reiter in Armeeuniform, Fotografen, Skijöring – Eisfest mit kostümierten Teilnehmer/innen vor Hotel Altein bei Nacht – Pferderennen am Obersee – Eiskunstlauf – Pferderennen, diesmal winterlicher – Schanzenspringen im Skigelände und viel Volk um die Alpgebäude Carmenna – Skifahrer im Aufstieg, Winterwanderer und nochmals Sprünge auf der Schneeschanze, Gruppe Skifahrer in wilder Schussfahrt, Wartende um die Hütten – Eishockey-Match – Impressionen von einem Abfahrtsrennen und Rundsicht über Arosa und Umgebung","1920, 1920-1929, genaues Datum nicht eruierbar","Brandt, Carl",Autor/in,Dokumentarfilm; Amateurfilm,Pferdesport; Ski alpin; Skispringen; Eishochey; Tourismus,Arosa,Eisfest: teilweise identische Aufnahmen in AVGR12097 „Ankunft David Zogg“ ; Schanzenspringen auf Carmenna: teilweise identische Aufnahmen in AVGR12115 „Touristen auf dem Tschuggen“,https://s3-eu-west-1.amazonaws.com/streaming.av-portal.gr.ch/13716/AVGR13716.mov,Intranet,Film,"35-mm-Film, Negativ und Positiv, Nitrat",stumm,s/w getönt,0:17:02,"Vorhandene Elemente: AVGR9942: Negativ, Nitrat (CS, Z 986-172.8); AVGR9943: Positiv Nitrat (CS, Z 986-172.7); AVGR12098: Interpositiv / Marron 2366, Kopie 2016 (KBG); AVGR13715: Internegativ 2234, Kopie 2016 (KBG); AVGR13716: Positivkopie Farbe 2383, Kopie 2016, eingefärbte Sequenzen (KBG)"
\ No newline at end of file
{
"path": "ERROR",
"format": "CSV"
}
\ No newline at end of file
......@@ -24,7 +24,7 @@
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
......
......@@ -7,16 +7,16 @@ sftp:
app:
csv:
header:
count: 1
count: 3
line: true
index: 1
identifier: 3
index: 2
identifier: 0
kafka:
producer:
bootstrap.servers: localhost:9092
bootstrap.servers: localhost:12345
client.id: test-client-1234
stream:
bootstrap.servers: localhost:9092
bootstrap.servers: localhost:12345
application.id: test-clinet-1234
topic:
in: test-topic-in
......
sftp:
host: sb-uingest1.swissbib.unibas.ch
port: 22
user: mb_sftp
password: ${SFTP_PASSWORD:?env}
fingerprint: ${HOST_KEY_VERIFIER:?env}
app:
csv:
header:
count: 3
line: false
index: 0
identifier: 0
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: test-client-1234
stream:
bootstrap.servers: localhost:12345
application.id: test-clinet-1234
topic:
in: test-topic-in
out: test-topic-out
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment