Tests.kt 4.77 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * sftp-reader
Jonas Waeber's avatar
Jonas Waeber committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18
package org.memobase
Jonas Waeber's avatar
Jonas Waeber committed
19

Jonas Waeber's avatar
Jonas Waeber committed
20 21 22 23 24 25 26 27 28
import java.io.File
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
29
import org.junit.jupiter.api.TestInstance
30
import org.junit.jupiter.api.extension.ExtendWith
Jonas Waeber's avatar
Jonas Waeber committed
31 32 33 34 35 36
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedKafkaExtension
import org.memobase.extensions.EmbeddedSftpServer
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
Jonas Waeber's avatar
Jonas Waeber committed
37

Jonas Waeber's avatar
Jonas Waeber committed
38
@ExtendWith(EmbeddedKafkaExtension::class)
Jonas Waeber's avatar
Jonas Waeber committed
39
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
40 41
class Tests {

Jonas Waeber's avatar
Jonas Waeber committed
42 43 44 45 46 47 48
    private val sftpServer = EmbeddedSftpServer(22000, "user", "password")

    init {
        sftpServer.createDirectories(
            "/memobase/test_institution_1/test_record_set_1/",
            "/memobase/test_institution_2/test_record_set_2/"
        )
Jonas Waeber's avatar
Jonas Waeber committed
49 50
        sftpServer.putFile("/memobase/test_institution_1/test_record_set_1/brandt.csv", "src/test/resources/data/brandt.csv")
        sftpServer.putFile("/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv", "src/test/resources/data/bauGAZH_metadaten.csv")
Jonas Waeber's avatar
Jonas Waeber committed
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
    }

    private val consumer: KafkaConsumer<String, String>

    init {
        val props = Properties()
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        consumer = KafkaConsumer(props)
    }

    @ParameterizedTest
    @MethodSource("directoryReaderTests")
    fun `read valid csv from sftp`(params: TestParams) {
        val settings = SettingsLoader(
            listOf("directory"),
            fileName = params.configFile,
            useProducerConfig = true,
            readSftpSettings = true
        )

        Producer(settings.kafkaProducerSettings, settings.outputTopic).use { producer ->
            SftpClient(settings.sftpSettings).use { sftp ->
                val validator = FileValidation(sftp)

                val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }

                for (file in files) {
                    val report = validator.validate(file)
                    when (report.status) {
                        "valid" -> producer.sendMessage(file.name, file.path, report.format)
                        else ->
                            producer.sendMessage(file.name, "ERROR", report.format)
                    }
                    producer.sendReport(report)
                }
            }
        }

        consumer.assign(listOf(TopicPartition(settings.outputTopic, 0)))
        val result = consumer.poll(Duration.ofMillis(1000))
        assertThat(result.count())
            .isEqualTo(1)

        assertThat(result.records(settings.outputTopic))
            .first()
            .hasFieldOrPropertyWithValue("key", params.expectedKey)
            .hasFieldOrPropertyWithValue("value", params.expectedValue)
Jonas Waeber's avatar
Jonas Waeber committed
103
    }
Jonas Waeber's avatar
Jonas Waeber committed
104 105 106 107

    private fun directoryReaderTests() = Stream.of(
        TestParams(
            "test1.yml",
Jonas Waeber's avatar
Jonas Waeber committed
108
            expectedKey = "data/brandt.csv",
Jonas Waeber's avatar
Jonas Waeber committed
109 110 111 112
            expectedValue = "{\"path\":\"/memobase/test_institution_1/test_record_set_1/brandt.csv\",\"format\":\"CSV\"}"
        ),
        TestParams(
            "test2.yml",
Jonas Waeber's avatar
Jonas Waeber committed
113
            expectedKey = "data/bauGAZH_metadaten.csv",
Jonas Waeber's avatar
Jonas Waeber committed
114 115 116
            expectedValue = "{\"path\":\"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\",\"format\":\"CSV\"}"
        )
        )
Jonas Waeber's avatar
Jonas Waeber committed
117
}