IntegrationTests.kt 9.36 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

19
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
20
21
22
23
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
Jonas Waeber's avatar
Jonas Waeber committed
24
import org.apache.logging.log4j.LogManager
Jonas Waeber's avatar
Jonas Waeber committed
25
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
26
import org.junit.jupiter.api.TestInstance
Jonas Waeber's avatar
Jonas Waeber committed
27
28
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
29
import org.memobase.params.IntegrationTestParams
Jonas Waeber's avatar
Jonas Waeber committed
30
import org.memobase.settings.SettingsLoader
Jonas Waeber's avatar
Jonas Waeber committed
31
32
import java.io.File
import java.nio.charset.Charset
Jonas Waeber's avatar
Jonas Waeber committed
33
import java.util.stream.Stream
Jonas Waeber's avatar
Jonas Waeber committed
34
35

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
36
class IntegrationTests {
Jonas Waeber's avatar
Jonas Waeber committed
37
38
    private val log = LogManager.getLogger("TestLogger")

Jonas Waeber's avatar
Jonas Waeber committed
39
    private val resourcePath = "src/test/resources"
Jonas Waeber's avatar
Jonas Waeber committed
40
41
42
    private fun readFile(fileName: String): String {
        return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
    }
Jonas Waeber's avatar
Jonas Waeber committed
43

44
45
    private fun reportingTopic(value: String) = "$value-reporting"

Jonas Waeber's avatar
Jonas Waeber committed
46
    private val regex = Regex("(_:B[A-Za-z0-9]+)")
Jonas Waeber's avatar
Jonas Waeber committed
47
48
49

    @ParameterizedTest
    @MethodSource("kafkaTests")
Jonas Waeber's avatar
Jonas Waeber committed
50
    fun `test kafka topology`(params: IntegrationTestParams) {
Jonas Waeber's avatar
Jonas Waeber committed
51
52
53
54
55
56
        val settings = SettingsLoader(
            listOf(
                "institutionId",
                "recordSetId",
                "configs"
            ),
Jonas Waeber's avatar
Jonas Waeber committed
57
            fileName = "kafkaTest${params.count}.yml",
Jonas Waeber's avatar
Jonas Waeber committed
58
59
            useStreamsConfig = true
        )
Jonas Waeber's avatar
Jonas Waeber committed
60
        val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
Jonas Waeber's avatar
Jonas Waeber committed
61
62
63
64
65
        val factory = ConsumerRecordFactory(
            StringSerializer(), StringSerializer()
        )
        testDriver.pipeInput(
            factory.create(
Jonas Waeber's avatar
Jonas Waeber committed
66
                settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json")
Jonas Waeber's avatar
Jonas Waeber committed
67
68
69
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
70
        var record = testDriver.readOutput(
Jonas Waeber's avatar
Jonas Waeber committed
71
72
73
74
75
            settings.outputTopic,
            StringDeserializer(),
            StringDeserializer()
        )

Jonas Waeber's avatar
Jonas Waeber committed
76
77
78
79
80
81
82
83
        var recordCount = 1
        while (record != null) {
            val sortedResult = record.value().lines().map {
                var replacedString = it
                for (matchResult in regex.findAll(it)) {
                    replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
                }
                replacedString
Jonas Waeber's avatar
Jonas Waeber committed
84
            }.sorted().reduce { acc, s -> acc + "\n" + s }
Jonas Waeber's avatar
Jonas Waeber committed
85

Jonas Waeber's avatar
Jonas Waeber committed
86
87
88
            assertThat(record)
                .isNotNull
                .hasFieldOrPropertyWithValue("key", params.expectedKeys[recordCount - 1])
Jonas Waeber's avatar
Jonas Waeber committed
89

Jonas Waeber's avatar
Jonas Waeber committed
90
91
92
            assertThat(sortedResult)
                .isEqualTo(readFile("kafkaTests/${params.count}/output$recordCount.nt"))

93
94
95
96
97
98
99
100
101
            val recordReport =
                testDriver.readOutput(
                    reportingTopic(settings.outputTopic),
                    StringDeserializer(),
                    StringDeserializer()
                )
            assertThat(Klaxon().parse<Report>(recordReport.value()))
                .isEqualTo(params.reports[recordCount - 1])

Jonas Waeber's avatar
Jonas Waeber committed
102
103
104
105
106
107
108
            record = testDriver.readOutput(
                settings.outputTopic,
                StringDeserializer(),
                StringDeserializer()
            )
            recordCount += 1
        }
Jonas Waeber's avatar
Jonas Waeber committed
109
    }
Jonas Waeber's avatar
Jonas Waeber committed
110

Jonas Waeber's avatar
Jonas Waeber committed
111
    private fun kafkaTests() = Stream.of(
Jonas Waeber's avatar
Jonas Waeber committed
112
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
113
114
            1,
            "MEI_49884",
Jonas Waeber's avatar
Jonas Waeber committed
115
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
116
117
                "https://memobase.ch/record/BAZ-B_MEI-MEI_49884",
                "https://memobase.ch/instantiation/physical/BAZ-MEI_49884-1"
118
119
120
            ),
            listOf(
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
121
                    "https://memobase.ch/record/BAZ-B_MEI-MEI_49884",
122
123
124
                    "SUCCESS",
                    "Successfully created a record from source."
                ),
Jonas Waeber's avatar
Jonas Waeber committed
125
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
126
                    "https://memobase.ch/record/BAZ-B_MEI-MEI_49884-1",
127
128
                    "SUCCESS",
                    "Successfully created a physical instantiation from source."
Jonas Waeber's avatar
Jonas Waeber committed
129
130
                )
            )
Jonas Waeber's avatar
Jonas Waeber committed
131
        ),
Jonas Waeber's avatar
Jonas Waeber committed
132
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
133
134
135
            2,
            "Sig Han 1293",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
136
137
                "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
                "https://memobase.ch/digital/TEST_RECORD_SET-Sig_Han_1293-1"
Jonas Waeber's avatar
Jonas Waeber committed
138
            ),
139
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
140
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
141
                    "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
142
143
144
145
                    "SUCCESS",
                    "Successfully created a record from source."
                ),
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
146
                    "https://memobase.ch/digital/TEST_RECORD_SET-Sig_Han_1293-1",
Jonas Waeber's avatar
Jonas Waeber committed
147
148
149
                    "SUCCESS",
                    "Successfully created a digital instantiation from source."
                )
150
            )
151
        ),
Jonas Waeber's avatar
Jonas Waeber committed
152
        IntegrationTestParams(
153
154
155
            3,
            "Sig Han 1293",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
156
157
158
                "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
                "https://memobase.ch/physical/TEST_RECORD_SET-Sig_Han_1293-1",
                "https://memobase.ch/digital/TEST_RECORD_SET-Sig_Han_1293-1"
159
160
            ),
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
161
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
162
                    "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
163
164
165
166
                    "SUCCESS",
                    "Successfully created a record from source."
                ),
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
167
                    "https://memobase.ch/physical/TEST_RECORD_SET-Sig_Han_1293-1",
Jonas Waeber's avatar
Jonas Waeber committed
168
169
170
171
                    "SUCCESS",
                    "Successfully created a physical instantiation from source."
                ),
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
172
                    "https://memobase.ch/digital/TEST_RECORD_SET-Sig_Han_1293-1",
Jonas Waeber's avatar
Jonas Waeber committed
173
174
175
                    "SUCCESS",
                    "Successfully created a digital instantiation from source."
                )
176
            )
177
        ),
Jonas Waeber's avatar
Jonas Waeber committed
178
        IntegrationTestParams(
179
180
181
            4,
            "Sig Han 1293",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
182
183
                "https://memobase.ch/record/TEST_RECORD_SET-sigantur-example",
                "https://memobase.ch/physical/TEST_RECORD_SET-sigantur-example-1"
184
185
            ),
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
186
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
187
                    "https://memobase.ch/record/TEST_RECORD_SET-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
188
189
190
191
                    "SUCCESS",
                    "Successfully created a record from source."
                ),
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
192
                    "https://memobase.ch/physical/TEST_RECORD_SET-sigantur-example-1",
Jonas Waeber's avatar
Jonas Waeber committed
193
194
195
                    "SUCCESS",
                    "Successfully created a physical instantiation from source."
                )
196
            )
197
        ),
Jonas Waeber's avatar
Jonas Waeber committed
198
        IntegrationTestParams(
199
200
201
            5,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
202
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
203
204
205
            ),
            listOf(
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
206
                    "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
207
208
209
210
                    "SUCCESS",
                    "Successfully created a record from source."
                )
            )
Jonas Waeber's avatar
Jonas Waeber committed
211
        ),
Jonas Waeber's avatar
Jonas Waeber committed
212
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
213
214
215
            6,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
216
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
Jonas Waeber's avatar
Jonas Waeber committed
217
218
219
            ),
            listOf(
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
220
                    "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
221
222
223
224
                    "SUCCESS",
                    "Successfully created a record from source."
                )
            )
Jonas Waeber's avatar
Jonas Waeber committed
225
        ),
Jonas Waeber's avatar
Jonas Waeber committed
226
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
227
228
229
            7,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
230
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
Jonas Waeber's avatar
Jonas Waeber committed
231
232
233
            ),
            listOf(
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
234
                    "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
235
236
237
238
                    "SUCCESS",
                    "Successfully created a record from source."
                )
            )
239
        ),
Jonas Waeber's avatar
Jonas Waeber committed
240
        IntegrationTestParams(
241
242
243
            8,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
244
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
245
246
247
            ),
            listOf(
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
248
                    "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
249
250
251
252
                    "SUCCESS",
                    "Successfully created a record from source."
                )
            )
Jonas Waeber's avatar
Jonas Waeber committed
253
        ),
Jonas Waeber's avatar
Jonas Waeber committed
254
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
255
256
257
            9,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
258
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
Jonas Waeber's avatar
Jonas Waeber committed
259
260
261
            ),
            listOf(
                Report(
Jonas Waeber's avatar
Jonas Waeber committed
262
                    "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
263
264
265
266
                    "SUCCESS",
                    "Successfully created a record from source."
                )
            )
Jonas Waeber's avatar
Jonas Waeber committed
267
268
        )
    )
Jonas Waeber's avatar
Jonas Waeber committed
269
}