IntegrationTests.kt 9.09 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

19
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
20
21
22
23
24
25
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
Jonas Waeber's avatar
Jonas Waeber committed
26
27
28
29
30
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.junit.jupiter.api.TestInstance
Jonas Waeber's avatar
Jonas Waeber committed
32
33
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.memobase.params.IntegrationTestParams
Jonas Waeber's avatar
Jonas Waeber committed
35
import org.memobase.rdf.NS
Jonas Waeber's avatar
Jonas Waeber committed
36
import java.io.File
Jonas Waeber's avatar
Jonas Waeber committed
37
import java.io.FileOutputStream
Jonas Waeber's avatar
Jonas Waeber committed
38
import java.nio.charset.Charset
Jonas Waeber's avatar
Jonas Waeber committed
39
import java.util.stream.Stream
Jonas Waeber's avatar
Jonas Waeber committed
40
41

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
42
class IntegrationTests {
Jonas Waeber's avatar
Jonas Waeber committed
43

Jonas Waeber's avatar
Jonas Waeber committed
44
    private val resourcePath = "src/test/resources"
Jonas Waeber's avatar
Jonas Waeber committed
45
46
47
    private fun readFile(fileName: String): String {
        return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
    }
Jonas Waeber's avatar
Jonas Waeber committed
48

Jonas Waeber's avatar
Jonas Waeber committed
49
    private val regex = Regex("(_:B[A-Za-z0-9]+)")
Jonas Waeber's avatar
Jonas Waeber committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")

    private fun sort(source: List<String>): String {
        return source.map {
            var replacedString = it
            for (matchResult in regex.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
            }
            for (matchResult in regexTime.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
            }
            replacedString
        }.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
    }
Jonas Waeber's avatar
Jonas Waeber committed
64
65
66

    @ParameterizedTest
    @MethodSource("kafkaTests")
Jonas Waeber's avatar
Jonas Waeber committed
67
    fun `test kafka topology`(params: IntegrationTestParams) {
68
        val service = Service("kafkaTest${params.count}.yml")
Jonas Waeber's avatar
Jonas Waeber committed
69
        val output = readFile("kafkaTests/${params.count}/output.nt").lines()
70
        val settings = service.settings
Jonas Waeber's avatar
Jonas Waeber committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84

        val headers = RecordHeaders()
        headers.add(RecordHeader("sessionId", "test-session-id".toByteArray()))
        headers.add(RecordHeader("recordSetId", "test-record-set-id".toByteArray()))
        headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray()))
        headers.add(RecordHeader("isPublished", "false".toByteArray()))
        headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
        headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
        headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))


Jonas Waeber's avatar
Jonas Waeber committed
85
        val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
Jonas Waeber's avatar
Jonas Waeber committed
86
87
88
        val factory = ConsumerRecordFactory(
            StringSerializer(), StringSerializer()
        )
Jonas Waeber's avatar
Jonas Waeber committed
89
90
91
92
93
94
95
96
97
98


        testDriver.pipeInput(
            factory.create(
                "import-process-config",
                "test-record-set-id#mapping",
                readFile("kafkaTests/${params.count}/config/mapping.yml")
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
99
100
        testDriver.pipeInput(
            factory.create(
Jonas Waeber's avatar
Jonas Waeber committed
101
                settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json"), headers
Jonas Waeber's avatar
Jonas Waeber committed
102
103
104
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
105
106

        val record = testDriver.readOutput(
Jonas Waeber's avatar
Jonas Waeber committed
107
108
109
110
111
            settings.outputTopic,
            StringDeserializer(),
            StringDeserializer()
        )

Jonas Waeber's avatar
Jonas Waeber committed
112
113
114
115
116
117
118
119
120
121
        val model = ModelFactory.createDefaultModel()
        NS.prefixMapping.map {
            model.setNsPrefix(it.key, it.value)
        }
        RDFDataMgr.read(model, record.value().byteInputStream(), Lang.NTRIPLES)
        RDFDataMgr.write(
            FileOutputStream("$resourcePath/kafkaTests/${params.count}/turtle-output.ttl"),
            model,
            RDFFormat.TURTLE_PRETTY
        )
Jonas Waeber's avatar
Jonas Waeber committed
122

Jonas Waeber's avatar
Jonas Waeber committed
123
        val sortedResult = sort(record.value().lines()).trim()
Jonas Waeber's avatar
Jonas Waeber committed
124

Jonas Waeber's avatar
Jonas Waeber committed
125
126
        assertThat(sortedResult)
            .isEqualTo(sort(output).trim())
127

Jonas Waeber's avatar
Jonas Waeber committed
128
129
130
131
132
133
134
        val reportRecord = testDriver.readOutput(
            service.settings.processReportTopic,
            StringDeserializer(),
            StringDeserializer()
        )
        assertThat(Klaxon().parse<Report>(reportRecord.value()))
            .isEqualTo(params.report)
Jonas Waeber's avatar
Jonas Waeber committed
135
    }
Jonas Waeber's avatar
Jonas Waeber committed
136

Jonas Waeber's avatar
Jonas Waeber committed
137
    private fun kafkaTests() = Stream.of(
Jonas Waeber's avatar
Jonas Waeber committed
138
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
139
140
            1,
            "MEI_49884",
Jonas Waeber's avatar
Jonas Waeber committed
141
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
142
143
                "https://memobase.ch/record/BAZ-B_MEI-MEI_49884",
                "https://memobase.ch/instantiation/physical/BAZ-MEI_49884-1"
144
            ),
Jonas Waeber's avatar
Jonas Waeber committed
145
146
147
148
            Report(
                "https://memobase.ch/record/test-institution-id-MEI_49884",
                "SUCCESS",
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-MEI_49884."
Jonas Waeber's avatar
Jonas Waeber committed
149
            )
Jonas Waeber's avatar
Jonas Waeber committed
150

151
        )/*,
Jonas Waeber's avatar
Jonas Waeber committed
152
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
153
154
155
            2,
            "Sig Han 1293",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
156
157
                "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
                "https://memobase.ch/digital/TEST_RECORD_SET-Sig_Han_1293-1"
Jonas Waeber's avatar
Jonas Waeber committed
158
            ),
Jonas Waeber's avatar
Jonas Waeber committed
159
160
161
162
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
                "SUCCESS",
                "Successfully created a record from source."
163
            )
164
        ),
Jonas Waeber's avatar
Jonas Waeber committed
165
        IntegrationTestParams(
166
167
168
            3,
            "Sig Han 1293",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
169
170
171
                "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
                "https://memobase.ch/physical/TEST_RECORD_SET-Sig_Han_1293-1",
                "https://memobase.ch/digital/TEST_RECORD_SET-Sig_Han_1293-1"
172
            ),
Jonas Waeber's avatar
Jonas Waeber committed
173
174
175
176
177
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
                "SUCCESS",
                "Successfully created a record from source."

178
            )
179
        ),
Jonas Waeber's avatar
Jonas Waeber committed
180
        IntegrationTestParams(
181
182
183
            4,
            "Sig Han 1293",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
184
185
                "https://memobase.ch/record/TEST_RECORD_SET-sigantur-example",
                "https://memobase.ch/physical/TEST_RECORD_SET-sigantur-example-1"
186
            ),
Jonas Waeber's avatar
Jonas Waeber committed
187
188
189
190
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-sigantur-example",
                "SUCCESS",
                "Successfully created a record from source."
191
            )
192
        ),
Jonas Waeber's avatar
Jonas Waeber committed
193
        IntegrationTestParams(
194
195
196
            5,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
197
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
198
            ),
Jonas Waeber's avatar
Jonas Waeber committed
199
200
201
202
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
                "SUCCESS",
                "Successfully created a record from source."
203
            )
Jonas Waeber's avatar
Jonas Waeber committed
204
        ),
Jonas Waeber's avatar
Jonas Waeber committed
205
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
206
207
208
            6,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
209
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
Jonas Waeber's avatar
Jonas Waeber committed
210
            ),
Jonas Waeber's avatar
Jonas Waeber committed
211
212
213
214
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
                "SUCCESS",
                "Successfully created a record from source."
Jonas Waeber's avatar
Jonas Waeber committed
215
            )
Jonas Waeber's avatar
Jonas Waeber committed
216
        ),
Jonas Waeber's avatar
Jonas Waeber committed
217
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
218
219
220
            7,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
221
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
Jonas Waeber's avatar
Jonas Waeber committed
222
            ),
Jonas Waeber's avatar
Jonas Waeber committed
223
224
225
226
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
                "SUCCESS",
                "Successfully created a record from source."
Jonas Waeber's avatar
Jonas Waeber committed
227
            )
228
        ),
Jonas Waeber's avatar
Jonas Waeber committed
229
        IntegrationTestParams(
230
231
232
            8,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
233
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
234
            ),
Jonas Waeber's avatar
Jonas Waeber committed
235
236
237
238
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
                "SUCCESS",
                "Successfully created a record from source."
239
            )
Jonas Waeber's avatar
Jonas Waeber committed
240
        ),
Jonas Waeber's avatar
Jonas Waeber committed
241
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
242
243
244
            9,
            "ID_1",
            listOf(
Jonas Waeber's avatar
Jonas Waeber committed
245
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1"
Jonas Waeber's avatar
Jonas Waeber committed
246
            ),
Jonas Waeber's avatar
Jonas Waeber committed
247
248
249
250
            Report(
                "https://memobase.ch/record/TEST_RECORD_SET-ID_1",
                "SUCCESS",
                "Successfully created a record from source."
Jonas Waeber's avatar
Jonas Waeber committed
251
            )
252
        )*/
Jonas Waeber's avatar
Jonas Waeber committed
253
    )
Jonas Waeber's avatar
Jonas Waeber committed
254
}