Tests.kt 6.64 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * sftp-reader
Jonas Waeber's avatar
Jonas Waeber committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18
package org.memobase
Jonas Waeber's avatar
Jonas Waeber committed
19

Jonas Waeber's avatar
Jonas Waeber committed
20
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
21
import java.io.File
Jonas Waeber's avatar
Jonas Waeber committed
22
import java.io.FileInputStream
Jonas Waeber's avatar
Jonas Waeber committed
23 24 25
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
Jonas Waeber's avatar
Jonas Waeber committed
26 27
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
Jonas Waeber's avatar
Jonas Waeber committed
28 29 30 31
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.junit.jupiter.api.TestInstance
35
import org.junit.jupiter.api.extension.ExtendWith
Jonas Waeber's avatar
Jonas Waeber committed
36 37 38 39 40 41
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.extensions.EmbeddedKafkaExtension
import org.memobase.extensions.EmbeddedSftpServer
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
Jonas Waeber's avatar
Jonas Waeber committed
42

Jonas Waeber's avatar
Jonas Waeber committed
43
@ExtendWith(EmbeddedKafkaExtension::class)
Jonas Waeber's avatar
Jonas Waeber committed
44
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
45
class Tests {
Jonas Waeber's avatar
Jonas Waeber committed
46
    private val log = LogManager.getLogger("LocalTestsLogger")
Jonas Waeber's avatar
Jonas Waeber committed
47

Jonas Waeber's avatar
Jonas Waeber committed
48 49
    private val sftpServer = EmbeddedSftpServer(22000, "user", "password")

Jonas Waeber's avatar
Jonas Waeber committed
50 51 52
    private val adminClient =
            AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))

Jonas Waeber's avatar
Jonas Waeber committed
53 54
    init {
        sftpServer.createDirectories(
Jonas Waeber's avatar
Jonas Waeber committed
55 56
                "/memobase/test_institution_1/test_record_set_1/",
                "/memobase/test_institution_2/test_record_set_2/"
Jonas Waeber's avatar
Jonas Waeber committed
57
        )
Jonas Waeber's avatar
Jonas Waeber committed
58 59
        sftpServer.putFile("/memobase/test_institution_1/test_record_set_1/brandt.csv", FileInputStream("src/test/resources/data/brandt.csv"))
        sftpServer.putFile("/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv", FileInputStream("src/test/resources/data/bauGAZH_metadaten.csv"))
Jonas Waeber's avatar
Jonas Waeber committed
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
    }

    private val consumer: KafkaConsumer<String, String>

    init {
        val props = Properties()
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        consumer = KafkaConsumer(props)
    }

    @ParameterizedTest
    @MethodSource("directoryReaderTests")
    fun `read valid csv from sftp`(params: TestParams) {
        val settings = SettingsLoader(
Jonas Waeber's avatar
Jonas Waeber committed
79 80 81 82
                listOf("directory"),
                fileName = params.configFile,
                useProducerConfig = true,
                readSftpSettings = true
Jonas Waeber's avatar
Jonas Waeber committed
83 84 85 86 87 88 89 90 91
        )

        Producer(settings.kafkaProducerSettings, settings.outputTopic).use { producer ->
            SftpClient(settings.sftpSettings).use { sftp ->
                val validator = FileValidation(sftp)

                val files = sftp.listFiles(settings.appSettings.getProperty("directory")).map { File(it) }

                for (file in files) {
Jonas Waeber's avatar
Jonas Waeber committed
92 93 94
                    val validationResult = validator.validate(file)
                    producer.sendMessage(validationResult.second.id, validationResult.first)
                    producer.sendReport(validationResult.second)
Jonas Waeber's avatar
Jonas Waeber committed
95 96 97 98 99
                }
            }
        }

        consumer.assign(listOf(TopicPartition(settings.outputTopic, 0)))
Jonas Waeber's avatar
Jonas Waeber committed
100 101 102 103 104 105 106 107 108 109
        var result = consumer.poll(Duration.ofMillis(10))
        while (result.count() == 0) {
            result = consumer.poll(Duration.ofMillis(10))
        }

        consumer.assign(listOf(TopicPartition(settings.outputTopic + "-reporting", 0)))
        var resultReport = consumer.poll(Duration.ofMillis(10))
        while (resultReport.count() == 0) {
            resultReport = consumer.poll(Duration.ofMillis(10))
        }
Jonas Waeber's avatar
Jonas Waeber committed
110 111

        assertThat(result.records(settings.outputTopic))
Jonas Waeber's avatar
Jonas Waeber committed
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
                .describedAs("Record Results")
                .hasSize(1)
                .first()
                .hasFieldOrPropertyWithValue("key", params.expectedKey)
                .hasFieldOrPropertyWithValue("value", params.expectedValue)

        assertThat(resultReport.records(settings.outputTopic + "-reporting"))
                .describedAs("Report Results")
                .hasSize(1)
                .first()
                .hasFieldOrPropertyWithValue("key", params.expectedKey)
                .hasFieldOrPropertyWithValue("value", params.expectedReportValue)

        // cleanup inside of class because there is no way to access topics outside of this function.
        adminClient.deleteTopics(listOf(settings.outputTopic, settings.outputTopic + "-reporting"))
Jonas Waeber's avatar
Jonas Waeber committed
127
    }
Jonas Waeber's avatar
Jonas Waeber committed
128 129

    private fun directoryReaderTests() = Stream.of(
Jonas Waeber's avatar
Jonas Waeber committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
            TestParams(
                    "test1.yml",
                    expectedKey = "brandt.csv",
                    expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_1/test_record_set_1/brandt.csv\"}",
                    expectedReportValue = Klaxon().toJsonString(Report(
                            id = "brandt.csv",
                            status = "SUCCESS",
                            message = "Validated file at path /memobase/test_institution_1/test_record_set_1/brandt.csv."))
            ),
            TestParams(
                    "test2.yml",
                    expectedKey = "bauGAZH_metadaten.csv",
                    expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv\"}",
                    expectedReportValue = Klaxon().toJsonString(Report(
                            id = "bauGAZH_metadaten.csv",
                            status = "SUCCESS",
                            message = "Validated file at path /memobase/test_institution_2/test_record_set_2/bauGAZH_metadaten.csv."))
            )
    )
Jonas Waeber's avatar
Jonas Waeber committed
149
}