Tests.kt 8.72 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * sftp-reader
Jonas Waeber's avatar
Jonas Waeber committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18
package org.memobase
Jonas Waeber's avatar
Jonas Waeber committed
19

Jonas Waeber's avatar
Jonas Waeber committed
20
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
21
import java.io.FileInputStream
Jonas Waeber's avatar
Jonas Waeber committed
22
import java.nio.file.Paths
Jonas Waeber's avatar
Jonas Waeber committed
23 24 25 26
import java.time.Duration
import java.util.Properties
import java.util.stream.Stream
import org.apache.kafka.clients.consumer.ConsumerConfig
Jonas Waeber's avatar
Jonas Waeber committed
27
import org.apache.kafka.clients.consumer.ConsumerRecord
Jonas Waeber's avatar
Jonas Waeber committed
28
import org.apache.kafka.clients.consumer.KafkaConsumer
Jonas Waeber's avatar
Jonas Waeber committed
29
import org.apache.kafka.common.header.internals.RecordHeader
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.apache.kafka.common.serialization.StringDeserializer
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
32
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
33
import org.junit.jupiter.api.TestInstance
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.junit.jupiter.api.assertAll
35
import org.junit.jupiter.api.extension.ExtendWith
Jonas Waeber's avatar
Jonas Waeber committed
36 37
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
38 39
import org.memobase.testing.EmbeddedKafkaExtension
import org.memobase.testing.EmbeddedSftpServer
Jonas Waeber's avatar
Jonas Waeber committed
40

Jonas Waeber's avatar
Jonas Waeber committed
41
@ExtendWith(EmbeddedKafkaExtension::class)
Jonas Waeber's avatar
Jonas Waeber committed
42
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
43
class Tests {
Jonas Waeber's avatar
Jonas Waeber committed
44
    private val log = LogManager.getLogger("LocalTestsLogger")
Jonas Waeber's avatar
Jonas Waeber committed
45

Jonas Waeber's avatar
Jonas Waeber committed
46 47 48
    private val sftpServer = EmbeddedSftpServer(22000, "user", "password")

    init {
Jonas Waeber's avatar
Jonas Waeber committed
49
        val files = listOf(
Jonas Waeber's avatar
Jonas Waeber committed
50 51 52 53 54 55 56 57
            Pair("/testset1", "brandt.csv"),
            Pair("/testset2", "bauGAZH_metadaten.csv"),
            Pair("/testset3", "invalid.csv"),
            Pair("/testset4", "file.txt"),
            Pair("/testset5", "20190906_Brandt_Metadaten.xlsx"),
            Pair("/testset6", "Export_Bilder_der_Arbeit_8.csv"),
            Pair("/testset7", "valid_xml.xml"),
            Pair("/testset8", "invalid.xml")
Jonas Waeber's avatar
Jonas Waeber committed
58
        )
Jonas Waeber's avatar
Jonas Waeber committed
59 60

        for (pair in files) {
Jonas Waeber's avatar
Jonas Waeber committed
61 62 63 64
            sftpServer.putFile(
                Paths.get(pair.first, pair.second).toString(),
                FileInputStream(Paths.get("src/test/resources/data", pair.second).toFile())
            )
Jonas Waeber's avatar
Jonas Waeber committed
65
        }
Jonas Waeber's avatar
Jonas Waeber committed
66 67 68
    }

    private val consumer: KafkaConsumer<String, String>
Jonas Waeber's avatar
Jonas Waeber committed
69
    private val klaxon = Klaxon()
Jonas Waeber's avatar
Jonas Waeber committed
70 71 72 73 74 75 76 77 78 79

    init {
        val props = Properties()
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        consumer = KafkaConsumer(props)
Jonas Waeber's avatar
Jonas Waeber committed
80
        consumer.subscribe(listOf("import-process-data-transform", "import-process-reporting"))
Jonas Waeber's avatar
Jonas Waeber committed
81 82 83 84
    }

    @ParameterizedTest
    @MethodSource("directoryReaderTests")
Jonas Waeber's avatar
Jonas Waeber committed
85
    fun `read files from sftp`(params: TestParams) {
86 87
        val service = Service(params.configFile)
        service.run()
Jonas Waeber's avatar
Jonas Waeber committed
88

89
        val topic = service.settings.outputTopic
Jonas Waeber's avatar
Jonas Waeber committed
90
        val reportingTopic = service.settings.processReportTopic
Jonas Waeber's avatar
Jonas Waeber committed
91 92
        val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()

Jonas Waeber's avatar
Jonas Waeber committed
93
        var result = consumer.poll(Duration.ofMillis(10))
Jonas Waeber's avatar
Jonas Waeber committed
94
        while (totalConsumerRecords.size != 2) {
Jonas Waeber's avatar
Jonas Waeber committed
95 96 97 98
            if (result.count() > 0) {
                totalConsumerRecords.addAll(result.asIterable())
            }
            log.error(result.count())
Jonas Waeber's avatar
Jonas Waeber committed
99 100
            result = consumer.poll(Duration.ofMillis(10))
        }
Jonas Waeber's avatar
Jonas Waeber committed
101 102 103
        val record = totalConsumerRecords.find { value -> value.topic() == topic }!!
        val recordKey = record.key()
        val recordMessage = record.value()
Jonas Waeber's avatar
Jonas Waeber committed
104
        val headers = record.headers()
Jonas Waeber's avatar
Jonas Waeber committed
105 106 107 108

        val report = totalConsumerRecords.find { value -> value.topic() == reportingTopic }!!
        val reportKey = report.key()
        val reportMessage = klaxon.parse<Report>(report.value())
Jonas Waeber's avatar
Jonas Waeber committed
109

Jonas Waeber's avatar
Jonas Waeber committed
110 111
        assertAll("",
            {
Jonas Waeber's avatar
Jonas Waeber committed
112 113 114 115 116 117
                assertThat(recordKey)
                    .isEqualTo(params.expectedKey)
            },
            {
                assertThat(recordMessage)
                    .isEqualTo(params.expectedValue)
Jonas Waeber's avatar
Jonas Waeber committed
118 119
            },
            {
Jonas Waeber's avatar
Jonas Waeber committed
120 121
                assertThat(reportKey)
                    .isEqualTo(params.expectedKey)
Jonas Waeber's avatar
Jonas Waeber committed
122 123
            },
            {
Jonas Waeber's avatar
Jonas Waeber committed
124 125
                assertThat(reportMessage)
                    .isEqualTo(params.expectedReportValue)
Jonas Waeber's avatar
Jonas Waeber committed
126 127 128 129
            },
            {
                assertThat(headers)
                    .contains(RecordHeader("sessionId", "session1".toByteArray()))
Jonas Waeber's avatar
Jonas Waeber committed
130 131
            }
        )
Jonas Waeber's avatar
Jonas Waeber committed
132
    }
Jonas Waeber's avatar
Jonas Waeber committed
133 134

    private fun directoryReaderTests() = Stream.of(
135 136 137
        TestParams(
            "test1.yml",
            expectedKey = "brandt.csv",
Jonas Waeber's avatar
Jonas Waeber committed
138
            expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/testset1/brandt.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
139 140 141
            expectedReportValue = Report(
                id = "brandt.csv",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
142
                message = "Validated file at path /testset1/brandt.csv with format CSV."
143 144 145 146 147
            )
        ),
        TestParams(
            "test2.yml",
            expectedKey = "bauGAZH_metadaten.csv",
Jonas Waeber's avatar
Jonas Waeber committed
148
            expectedValue = "{\"format\" : \"CSV\", \"path\" : \"/testset2/bauGAZH_metadaten.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
149 150 151
            expectedReportValue = Report(
                id = "bauGAZH_metadaten.csv",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
152
                message = "Validated file at path /testset2/bauGAZH_metadaten.csv with format CSV."
153 154 155 156 157
            )
        ),
        TestParams(
            "test3.yml",
            expectedKey = "invalid.csv",
Jonas Waeber's avatar
Jonas Waeber committed
158
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset3/invalid.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
159 160 161
            expectedReportValue = Report(
                id = "invalid.csv",
                status = "FAILURE",
Jonas Waeber's avatar
Jonas Waeber committed
162
                message = "CSV ERROR: Fields num seems to be 4 on each row, but on 2th csv row, fields num is 6. for file /testset3/invalid.csv."
163 164 165 166 167
            )
        ),
        TestParams(
            "test4.yml",
            expectedKey = "file.txt",
Jonas Waeber's avatar
Jonas Waeber committed
168
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset4/file.txt\"}",
Jonas Waeber's avatar
Jonas Waeber committed
169 170 171 172
            expectedReportValue = Report(
                id = "file.txt",
                status = "FAILURE",
                message = "File Extension Error: Not a valid file extension: file.txt."
Jonas Waeber's avatar
Jonas Waeber committed
173
            )
174 175 176
        ),
        TestParams(
            "test5.yml",
Jonas Waeber's avatar
Jonas Waeber committed
177
            expectedKey = "20190906_Brandt_Metadaten.xlsx",
Jonas Waeber's avatar
Jonas Waeber committed
178
            expectedValue = "{\"format\" : \"XLSX\", \"path\" : \"/testset5/20190906_Brandt_Metadaten.xlsx\"}",
Jonas Waeber's avatar
Jonas Waeber committed
179 180 181
            expectedReportValue = Report(
                id = "20190906_Brandt_Metadaten.xlsx",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
182
                message = "Validated file at path /testset5/20190906_Brandt_Metadaten.xlsx with format XLSX."
183
            )
Jonas Waeber's avatar
Jonas Waeber committed
184 185 186 187
        ),
        TestParams(
            "test6.yml",
            expectedKey = "Export_Bilder_der_Arbeit_8.csv",
Jonas Waeber's avatar
Jonas Waeber committed
188
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset6/Export_Bilder_der_Arbeit_8.csv\"}",
Jonas Waeber's avatar
Jonas Waeber committed
189 190 191
            expectedReportValue = Report(
                id = "Export_Bilder_der_Arbeit_8.csv",
                status = "FAILURE",
Jonas Waeber's avatar
Jonas Waeber committed
192
                message = "CSV ERROR: Fields num seems to be 1 on each row, but on 2th csv row, fields num is 5. for file /testset6/Export_Bilder_der_Arbeit_8.csv."
Jonas Waeber's avatar
Jonas Waeber committed
193
            )
194 195 196 197
        ),
        TestParams(
            "test7.yml",
            expectedKey = "valid_xml.xml",
Jonas Waeber's avatar
Jonas Waeber committed
198
            expectedValue = "{\"format\" : \"XML\", \"path\" : \"/testset7/valid_xml.xml\"}",
Jonas Waeber's avatar
Jonas Waeber committed
199 200 201
            expectedReportValue = Report(
                id = "valid_xml.xml",
                status = "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
202
                message = "Validated file at path /testset7/valid_xml.xml with format XML."
203 204 205 206 207
            )
        ),
        TestParams(
            "test8.yml",
            expectedKey = "invalid.xml",
Jonas Waeber's avatar
Jonas Waeber committed
208
            expectedValue = "{\"format\" : \"ERROR\", \"path\" : \"/testset8/invalid.xml\"}",
Jonas Waeber's avatar
Jonas Waeber committed
209 210 211
            expectedReportValue = Report(
                id = "invalid.xml",
                status = "FAILURE",
Jonas Waeber's avatar
Jonas Waeber committed
212
                message = "XML ERROR: Element type \"foxml:objectProperties\" must be followed by either attribute specifications, \">\" or \"/>\". for file /testset8/invalid.xml."
213
            )
214
        )
Jonas Waeber's avatar
Jonas Waeber committed
215
    )
Jonas Waeber's avatar
Jonas Waeber committed
216
}