Tests.kt 8.47 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * sftp-reader
Jonas Waeber's avatar
Jonas Waeber committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18
package org.memobase
Jonas Waeber's avatar
Jonas Waeber committed
19

Jonas Waeber's avatar
Jonas Waeber committed
20
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
21
import java.io.FileInputStream
Jonas Waeber's avatar
Jonas Waeber committed
22
import java.nio.file.Paths
Jonas Waeber's avatar
Jonas Waeber committed
23 24 25 26
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
import org.apache.kafka.clients.consumer.ConsumerConfig
Jonas Waeber's avatar
Jonas Waeber committed
27
import org.apache.kafka.clients.consumer.ConsumerRecord
Jonas Waeber's avatar
Jonas Waeber committed
28 29
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.junit.jupiter.api.TestInstance
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.junit.jupiter.api.assertAll
34
import org.junit.jupiter.api.extension.ExtendWith
Jonas Waeber's avatar
Jonas Waeber committed
35 36
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
37 38
import org.memobase.testing.EmbeddedKafkaExtension
import org.memobase.testing.EmbeddedSftpServer
Jonas Waeber's avatar
Jonas Waeber committed
39

Jonas Waeber's avatar
Jonas Waeber committed
40
@ExtendWith(EmbeddedKafkaExtension::class)
Jonas Waeber's avatar
Jonas Waeber committed
41
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
42
class Tests {
Jonas Waeber's avatar
Jonas Waeber committed
43
    private val log = LogManager.getLogger("LocalTestsLogger")
Jonas Waeber's avatar
Jonas Waeber committed
44

Jonas Waeber's avatar
Jonas Waeber committed
45 46 47
    private val sftpServer = EmbeddedSftpServer(22000, "user", "password")

    init {
Jonas Waeber's avatar
Jonas Waeber committed
48
        val files = listOf(
Jonas Waeber's avatar
Jonas Waeber committed
49 50 51 52 53 54 55 56
            Pair("/testset1", "brandt.csv"),
            Pair("/testset2", "bauGAZH_metadaten.csv"),
            Pair("/testset3", "invalid.csv"),
            Pair("/testset4", "file.txt"),
            Pair("/testset5", "20190906_Brandt_Metadaten.xlsx"),
            Pair("/testset6", "Export_Bilder_der_Arbeit_8.csv"),
            Pair("/testset7", "valid_xml.xml"),
            Pair("/testset8", "invalid.xml")
Jonas Waeber's avatar
Jonas Waeber committed
57
        )
Jonas Waeber's avatar
Jonas Waeber committed
58 59

        for (pair in files) {
Jonas Waeber's avatar
Jonas Waeber committed
60 61 62 63
            sftpServer.putFile(
                Paths.get(pair.first, pair.second).toString(),
                FileInputStream(Paths.get("src/test/resources/data", pair.second).toFile())
            )
Jonas Waeber's avatar
Jonas Waeber committed
64
        }
Jonas Waeber's avatar
Jonas Waeber committed
65 66 67
    }

    private val consumer: KafkaConsumer<String, String>
Jonas Waeber's avatar
Jonas Waeber committed
68
    private val klaxon = Klaxon()
Jonas Waeber's avatar
Jonas Waeber committed
69 70 71 72 73 74 75 76 77 78

    init {
        val props = Properties()
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        consumer = KafkaConsumer(props)
Jonas Waeber's avatar
Jonas Waeber committed
79
        consumer.subscribe(listOf("import-process-data-transform", "import-process-reporting"))
Jonas Waeber's avatar
Jonas Waeber committed
80 81 82 83
    }

    @ParameterizedTest
    @MethodSource("directoryReaderTests")
Jonas Waeber's avatar
Jonas Waeber committed
84
    fun `read files from sftp`(params: TestParams) {
85 86
        val service = Service(params.configFile)
        service.run()
Jonas Waeber's avatar
Jonas Waeber committed
87

88
        val topic = service.settings.outputTopic
Jonas Waeber's avatar
Jonas Waeber committed
89
        val reportingTopic = service.settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
90 91
        val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()

Jonas Waeber's avatar
Jonas Waeber committed
92
        var result = consumer.poll(Duration.ofMillis(10))
Jonas Waeber's avatar
Jonas Waeber committed
93
        while (totalConsumerRecords.size != 2) {
Jonas Waeber's avatar
Jonas Waeber committed
94 95 96 97
            if (result.count() > 0) {
                totalConsumerRecords.addAll(result.asIterable())
            }
            log.error(result.count())
Jonas Waeber's avatar
Jonas Waeber committed
98 99
            result = consumer.poll(Duration.ofMillis(10))
        }
Jonas Waeber's avatar
Jonas Waeber committed
100 101 102 103 104 105 106
        val record = totalConsumerRecords.find { value -> value.topic() == topic }!!
        val recordKey = record.key()
        val recordMessage = record.value()

        val report = totalConsumerRecords.find { value -> value.topic() == reportingTopic }!!
        val reportKey = report.key()
        val reportMessage = klaxon.parse<Report>(report.value())
Jonas Waeber's avatar
Jonas Waeber committed
107

Jonas Waeber's avatar
Jonas Waeber committed
108 109
        assertAll("",
            {
Jonas Waeber's avatar
Jonas Waeber committed
110 111 112 113 114 115
                assertThat(recordKey)
                    .isEqualTo(params.expectedKey)
            },
            {
                assertThat(recordMessage)
                    .isEqualTo(params.expectedValue)
Jonas Waeber's avatar
Jonas Waeber committed
116 117
            },
            {
Jonas Waeber's avatar
Jonas Waeber committed
118 119
                assertThat(reportKey)
                    .isEqualTo(params.expectedKey)
Jonas Waeber's avatar
Jonas Waeber committed
120 121
            },
            {
Jonas Waeber's avatar
Jonas Waeber committed
122 123
                assertThat(reportMessage)
                    .isEqualTo(params.expectedReportValue)
Jonas Waeber's avatar
Jonas Waeber committed
124 125
            }
        )
Jonas Waeber's avatar
Jonas Waeber committed
126
    }
Jonas Waeber's avatar
Jonas Waeber committed
127 128

    private fun directoryReaderTests() = Stream.of(
129 130 131
        TestParams(
            "test1.yml",
            expectedKey = "brandt.csv",
Jonas Waeber's avatar
Jonas Waeber committed
132
            expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/testset1/brandt.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
133 134 135
            expectedReportValue = Report(
                id = "brandt.csv",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
136
                message = "Validated file at path /testset1/brandt.csv with format CSV."
137 138 139 140 141
            )
        ),
        TestParams(
            "test2.yml",
            expectedKey = "bauGAZH_metadaten.csv",
Jonas Waeber's avatar
Jonas Waeber committed
142
            expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/testset2/bauGAZH_metadaten.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
143 144 145
            expectedReportValue = Report(
                id = "bauGAZH_metadaten.csv",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
146
                message = "Validated file at path /testset2/bauGAZH_metadaten.csv with format CSV."
147 148 149 150 151
            )
        ),
        TestParams(
            "test3.yml",
            expectedKey = "invalid.csv",
Jonas Waeber's avatar
Jonas Waeber committed
152
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset3/invalid.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
153 154 155
            expectedReportValue = Report(
                id = "invalid.csv",
                status = "FAILURE",
Jonas Waeber's avatar
Jonas Waeber committed
156
                message = "CSV ERROR: Fields num seems to be 4 on each row, but on 2th csv row, fields num is 6. for file /testset3/invalid.csv."
157 158 159 160 161
            )
        ),
        TestParams(
            "test4.yml",
            expectedKey = "file.txt",
Jonas Waeber's avatar
Jonas Waeber committed
162
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset4/file.txt\"}",
Jonas Waeber's avatar
Jonas Waeber committed
163 164 165 166
            expectedReportValue = Report(
                id = "file.txt",
                status = "FAILURE",
                message = "File Extension Error: Not a valid file extension: file.txt."
Jonas Waeber's avatar
Jonas Waeber committed
167
            )
168 169 170
        ),
        TestParams(
            "test5.yml",
Jonas Waeber's avatar
Jonas Waeber committed
171
            expectedKey = "20190906_Brandt_Metadaten.xlsx",
Jonas Waeber's avatar
Jonas Waeber committed
172
            expectedValue = "{\"format\" : \"XLSX\", \"path\" : \"/testset5/20190906_Brandt_Metadaten.xlsx\"}",
Jonas Waeber's avatar
Jonas Waeber committed
173 174 175
            expectedReportValue = Report(
                id = "20190906_Brandt_Metadaten.xlsx",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
176
                message = "Validated file at path /testset5/20190906_Brandt_Metadaten.xlsx with format XLSX."
177
            )
Jonas Waeber's avatar
Jonas Waeber committed
178 179 180 181
        ),
        TestParams(
            "test6.yml",
            expectedKey = "Export_Bilder_der_Arbeit_8.csv",
Jonas Waeber's avatar
Jonas Waeber committed
182
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset6/Export_Bilder_der_Arbeit_8.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
183 184 185
            expectedReportValue = Report(
                id = "Export_Bilder_der_Arbeit_8.csv",
                status = "FAILURE",
Jonas Waeber's avatar
Jonas Waeber committed
186
                message = "CSV ERROR: Fields num seems to be 1 on each row, but on 2th csv row, fields num is 5. for file /testset6/Export_Bilder_der_Arbeit_8.csv."
Jonas Waeber's avatar
Jonas Waeber committed
187
            )
188 189 190 191
        ),
        TestParams(
            "test7.yml",
            expectedKey = "valid_xml.xml",
Jonas Waeber's avatar
Jonas Waeber committed
192
            expectedValue = "{\"format\" : \"XML\", \"path\" : \"/testset7/valid_xml.xml\"}",
Jonas Waeber's avatar
Jonas Waeber committed
193 194 195
            expectedReportValue = Report(
                id = "valid_xml.xml",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
196
                message = "Validated file at path /testset7/valid_xml.xml with format XML."
197 198 199 200 201
            )
        ),
        TestParams(
            "test8.yml",
            expectedKey = "invalid.xml",
Jonas Waeber's avatar
Jonas Waeber committed
202
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset8/invalid.xml\"}",
Jonas Waeber's avatar
Jonas Waeber committed
203 204 205
            expectedReportValue = Report(
                id = "invalid.xml",
                status = "FAILURE",
Jonas Waeber's avatar
Jonas Waeber committed
206
                message = "XML ERROR: Element type \"foxml:objectProperties\" must be followed by either attribute specifications, \">\" or \"/>\". for file /testset8/invalid.xml."
207
            )
208
        )
Jonas Waeber's avatar
Jonas Waeber committed
209
    )
Jonas Waeber's avatar
Jonas Waeber committed
210
}