IntegrationTests.kt 9.22 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

19
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
20
21
22
23
24
25
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
Jonas Waeber's avatar
Jonas Waeber committed
26
27
28
29
30
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.junit.jupiter.api.TestInstance
32
import org.junit.jupiter.api.assertAll
Jonas Waeber's avatar
Jonas Waeber committed
33
34
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
35
import org.memobase.params.IntegrationTestParams
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.memobase.rdf.NS
Jonas Waeber's avatar
Jonas Waeber committed
37
import java.io.File
Jonas Waeber's avatar
Jonas Waeber committed
38
import java.io.FileOutputStream
Jonas Waeber's avatar
Jonas Waeber committed
39
import java.nio.charset.Charset
Jonas Waeber's avatar
Jonas Waeber committed
40
import java.util.stream.Stream
Jonas Waeber's avatar
Jonas Waeber committed
41
42

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
43
class IntegrationTests {
Jonas Waeber's avatar
Jonas Waeber committed
44

Jonas Waeber's avatar
Jonas Waeber committed
45
    private val resourcePath = "src/test/resources"
Jonas Waeber's avatar
Jonas Waeber committed
46
47
48
    private fun readFile(fileName: String): String {
        return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
    }
Jonas Waeber's avatar
Jonas Waeber committed
49

Jonas Waeber's avatar
Jonas Waeber committed
50
    private val regex = Regex("(_:B[A-Za-z0-9]+)")
Jonas Waeber's avatar
Jonas Waeber committed
51
52
53
54
55
56
57
58
59
60
61
62
63
64
    private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")

    private fun sort(source: List<String>): String {
        return source.map {
            var replacedString = it
            for (matchResult in regex.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
            }
            for (matchResult in regexTime.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
            }
            replacedString
        }.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
    }
Jonas Waeber's avatar
Jonas Waeber committed
65
66
67

    @ParameterizedTest
    @MethodSource("kafkaTests")
Jonas Waeber's avatar
Jonas Waeber committed
68
    fun `test kafka topology`(params: IntegrationTestParams) {
69
        val service = Service("kafkaTest${params.count}.yml")
Jonas Waeber's avatar
Jonas Waeber committed
70
        val output = readFile("kafkaTests/${params.count}/output.nt").lines()
71
        val settings = service.settings
Jonas Waeber's avatar
Jonas Waeber committed
72
73
74
75
76
77
78
79
80
81
82
83
84
85

        val headers = RecordHeaders()
        headers.add(RecordHeader("sessionId", "test-session-id".toByteArray()))
        headers.add(RecordHeader("recordSetId", "test-record-set-id".toByteArray()))
        headers.add(RecordHeader("institutionId", "test-institution-id".toByteArray()))
        headers.add(RecordHeader("isPublished", "false".toByteArray()))
        headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
        headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
        headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))


Jonas Waeber's avatar
Jonas Waeber committed
86
        val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
Jonas Waeber's avatar
Jonas Waeber committed
87
88
89
        val factory = ConsumerRecordFactory(
            StringSerializer(), StringSerializer()
        )
Jonas Waeber's avatar
Jonas Waeber committed
90
91
92
93
94
95
96
97
98
99


        testDriver.pipeInput(
            factory.create(
                "import-process-config",
                "test-record-set-id#mapping",
                readFile("kafkaTests/${params.count}/config/mapping.yml")
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
100
101
        testDriver.pipeInput(
            factory.create(
Jonas Waeber's avatar
Jonas Waeber committed
102
                settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json"), headers
Jonas Waeber's avatar
Jonas Waeber committed
103
104
105
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
106
107

        val record = testDriver.readOutput(
Jonas Waeber's avatar
Jonas Waeber committed
108
109
110
111
112
            settings.outputTopic,
            StringDeserializer(),
            StringDeserializer()
        )

Jonas Waeber's avatar
Jonas Waeber committed
113
114
115
116
117
118
119
120
121
122
        val model = ModelFactory.createDefaultModel()
        NS.prefixMapping.map {
            model.setNsPrefix(it.key, it.value)
        }
        RDFDataMgr.read(model, record.value().byteInputStream(), Lang.NTRIPLES)
        RDFDataMgr.write(
            FileOutputStream("$resourcePath/kafkaTests/${params.count}/turtle-output.ttl"),
            model,
            RDFFormat.TURTLE_PRETTY
        )
Jonas Waeber's avatar
Jonas Waeber committed
123

124
        val key = record.key()
Jonas Waeber's avatar
Jonas Waeber committed
125
        val sortedResult = sort(record.value().lines()).trim()
Jonas Waeber's avatar
Jonas Waeber committed
126

Jonas Waeber's avatar
Jonas Waeber committed
127
128
129
130
131
        val reportRecord = testDriver.readOutput(
            service.settings.processReportTopic,
            StringDeserializer(),
            StringDeserializer()
        )
132
133
134
135
136
137
138
139
140
141
142
143
144
145
        val reportString = reportRecord.value()

        assertAll("",
            { assertThat(key).isEqualTo(params.expectedKey) },
            {
                assertThat(sortedResult)
                    .isEqualTo(sort(output).trim())
            },
            {
                assertThat(Klaxon().parse<Report>(reportString))
                    .isEqualTo(params.report)
            }

        )
Jonas Waeber's avatar
Jonas Waeber committed
146
    }
Jonas Waeber's avatar
Jonas Waeber committed
147

Jonas Waeber's avatar
Jonas Waeber committed
148
    private fun kafkaTests() = Stream.of(
149
150
151
152
153
154
155
156
157
158
159
160
161
162
        /* IntegrationTestParams(
             1,
             "MEI_49884",
             listOf(
                 "https://memobase.ch/record/BAZ-B_MEI-MEI_49884",
                 "https://memobase.ch/instantiation/physical/BAZ-MEI_49884-1"
             ),
             Report(
                 "https://memobase.ch/record/test-institution-id-MEI_49884",
                 "SUCCESS",
                 "Successfully mapped record with id https://memobase.ch/record/test-institution-id-MEI_49884."
             )

         ),
Jonas Waeber's avatar
Jonas Waeber committed
163
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
164
165
            2,
            "Sig Han 1293",
166
            "https://memobase.ch/record/test-institution-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
167
            Report(
168
                "https://memobase.ch/record/test-institution-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
169
                "SUCCESS",
170
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-Sig_Han_1293."
171
            )
172
        ),
Jonas Waeber's avatar
Jonas Waeber committed
173
        IntegrationTestParams(
174
175
            3,
            "Sig Han 1293",
176
            "ttps://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
177
            Report(
178
                "https://memobase.ch/record/test-institution-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
179
                "SUCCESS",
180
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-Sig_Han_1293."
181
            )
182
        ),
Jonas Waeber's avatar
Jonas Waeber committed
183
        IntegrationTestParams(
184
            4,
185
186
            "InputKey",
           "https://memobase.ch/record/test-institution-id-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
187
            Report(
188
                "https://memobase.ch/record/test-institution-id-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
189
                "SUCCESS",
190
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-sigantur-example."
191
            )
192
        ),*/
Jonas Waeber's avatar
Jonas Waeber committed
193
        IntegrationTestParams(
194
195
            5,
            "ID_1",
196
           "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
197
            Report(
198
                "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
199
                "SUCCESS",
200
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-ID_1."
201
            )
202
        )/*,
Jonas Waeber's avatar
Jonas Waeber committed
203
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
204
205
            6,
            "ID_1",
206
           "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
207
            Report(
208
                "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
209
                "SUCCESS",
210
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
211
            )
Jonas Waeber's avatar
Jonas Waeber committed
212
        ),
Jonas Waeber's avatar
Jonas Waeber committed
213
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
214
215
            7,
            "ID_1",
216
           "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
217
            Report(
218
                "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
219
                "SUCCESS",
220
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
221
            )
222
        ),
Jonas Waeber's avatar
Jonas Waeber committed
223
        IntegrationTestParams(
224
225
            8,
            "ID_1",
226
           "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
227
            Report(
228
                "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
229
                "SUCCESS",
230
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-ID_1."
231
            )
Jonas Waeber's avatar
Jonas Waeber committed
232
        ),
Jonas Waeber's avatar
Jonas Waeber committed
233
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
234
235
            9,
            "ID_1",
236
           "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
237
            Report(
238
                "https://memobase.ch/record/test-institution-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
239
                "SUCCESS",
240
                "Successfully mapped record with id https://memobase.ch/record/test-institution-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
241
            )
242
        )*/
Jonas Waeber's avatar
Jonas Waeber committed
243
    )
Jonas Waeber's avatar
Jonas Waeber committed
244
}