IntegrationTests.kt 9.31 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
2
3
 * Mapper Service
 * Copyright (C) 2020-2021  Memobase
Jonas Waeber's avatar
Jonas Waeber committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
20
21
import ch.memobase.rdf.NS
import ch.memobase.reporting.Report
Jonas Waeber's avatar
Jonas Waeber committed
22
import ch.memobase.reporting.ReportStatus
23
import com.beust.klaxon.Klaxon
24
25
26
27
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.util.stream.Stream
Jonas Waeber's avatar
Jonas Waeber committed
28
29
30
31
32
33
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
Jonas Waeber's avatar
Jonas Waeber committed
34
35
36
37
38
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
39
import org.junit.jupiter.api.TestInstance
40
import org.junit.jupiter.api.assertAll
Jonas Waeber's avatar
Jonas Waeber committed
41
42
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
43
import org.memobase.params.IntegrationTestParams
Jonas Waeber's avatar
Jonas Waeber committed
44
45

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
46
class IntegrationTests {
Jonas Waeber's avatar
Jonas Waeber committed
47

Jonas Waeber's avatar
Jonas Waeber committed
48
49
    private val klaxon = Klaxon()

Jonas Waeber's avatar
Jonas Waeber committed
50
    private val resourcePath = "src/test/resources"
Jonas Waeber's avatar
Jonas Waeber committed
51
52
53
    private fun readFile(fileName: String): String {
        return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
    }
Jonas Waeber's avatar
Jonas Waeber committed
54

Jonas Waeber's avatar
Jonas Waeber committed
55
    private val regex = Regex("(_:B[A-Za-z0-9]+)")
Jonas Waeber's avatar
Jonas Waeber committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")

    private fun sort(source: List<String>): String {
        return source.map {
            var replacedString = it
            for (matchResult in regex.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
            }
            for (matchResult in regexTime.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
            }
            replacedString
        }.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
    }
Jonas Waeber's avatar
Jonas Waeber committed
70
71
72

    @ParameterizedTest
    @MethodSource("kafkaTests")
Jonas Waeber's avatar
Jonas Waeber committed
73
    fun `test kafka topology`(params: IntegrationTestParams) {
74
        val service = Service("kafkaTest${params.count}.yml")
Jonas Waeber's avatar
Jonas Waeber committed
75
        val output = readFile("kafkaTests/${params.count}/output.nt").lines()
76
        val settings = service.settings
Jonas Waeber's avatar
Jonas Waeber committed
77
78

        val headers = RecordHeaders()
Jonas Waeber's avatar
Jonas Waeber committed
79
80
81
        headers.add(RecordHeader("sessionId", "1".toByteArray()))
        headers.add(RecordHeader("recordSetId", "ati-002".toByteArray()))
        headers.add(RecordHeader("institutionId", "ati".toByteArray()))
Jonas Waeber's avatar
Jonas Waeber committed
82
83
84
85
86
87
88
89
        headers.add(RecordHeader("isPublished", "false".toByteArray()))
        headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
        headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
        headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))

Jonas Waeber's avatar
Jonas Waeber committed
90
        val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
Jonas Waeber's avatar
Jonas Waeber committed
91
92
93
        val factory = ConsumerRecordFactory(
            StringSerializer(), StringSerializer()
        )
Jonas Waeber's avatar
Jonas Waeber committed
94
95
96

        testDriver.pipeInput(
            factory.create(
Jonas Waeber's avatar
Jonas Waeber committed
97
                "mb-di-config-test",
Jonas Waeber's avatar
Jonas Waeber committed
98
                "ati-002#mapping",
Jonas Waeber's avatar
Jonas Waeber committed
99
100
101
102
                readFile("kafkaTests/${params.count}/config/mapping.yml")
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
103
104
        testDriver.pipeInput(
            factory.create(
Jonas Waeber's avatar
Jonas Waeber committed
105
                settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json"), headers
Jonas Waeber's avatar
Jonas Waeber committed
106
107
108
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
109
110
111
112
113
114
115
116
117
118
119
120
        val reportRecord = testDriver.readOutput(
            service.settings.processReportTopic,
            StringDeserializer(),
            StringDeserializer()
        )
        val reportString = reportRecord.value()
        val report = klaxon.parse<Report>(reportString)!!
        if (report.status == ReportStatus.fatal) {
            assertThat(report)
                .isEqualTo(params.report)
            return
        }
Jonas Waeber's avatar
Jonas Waeber committed
121
122

        val record = testDriver.readOutput(
Jonas Waeber's avatar
Jonas Waeber committed
123
124
125
126
127
            settings.outputTopic,
            StringDeserializer(),
            StringDeserializer()
        )

Jonas Waeber's avatar
Jonas Waeber committed
128
129
130
131
132
133
134
135
136
137
        val model = ModelFactory.createDefaultModel()
        NS.prefixMapping.map {
            model.setNsPrefix(it.key, it.value)
        }
        RDFDataMgr.read(model, record.value().byteInputStream(), Lang.NTRIPLES)
        RDFDataMgr.write(
            FileOutputStream("$resourcePath/kafkaTests/${params.count}/turtle-output.ttl"),
            model,
            RDFFormat.TURTLE_PRETTY
        )
Jonas Waeber's avatar
Jonas Waeber committed
138

139
        val key = record.key()
Jonas Waeber's avatar
Jonas Waeber committed
140
        val sortedResult = sort(record.value().lines()).trim()
Jonas Waeber's avatar
Jonas Waeber committed
141

142
143
144
145
146
147
148
149
150
151
152
153
        assertAll("",
            { assertThat(key).isEqualTo(params.expectedKey) },
            {
                assertThat(sortedResult)
                    .isEqualTo(sort(output).trim())
            },
            {
                assertThat(Klaxon().parse<Report>(reportString))
                    .isEqualTo(params.report)
            }

        )
Jonas Waeber's avatar
Jonas Waeber committed
154
    }
Jonas Waeber's avatar
Jonas Waeber committed
155

Jonas Waeber's avatar
Jonas Waeber committed
156
    private fun kafkaTests() = Stream.of(
Jonas Waeber's avatar
Jonas Waeber committed
157
        IntegrationTestParams(
158
159
             1,
             "MEI_49884",
Jonas Waeber's avatar
Jonas Waeber committed
160
             "https://memobase.ch/record/test-record-set-id-MEI_49884",
161
             Report(
Jonas Waeber's avatar
Jonas Waeber committed
162
163
164
                 "MEI_49884",
                 "FATAL",
                 "No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.",
165
                 "test"
166
167
             )

168
        )/*,
Jonas Waeber's avatar
Jonas Waeber committed
169
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
170
171
            2,
            "Sig Han 1293",
Jonas Waeber's avatar
Jonas Waeber committed
172
            "https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
173
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
174
                "https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
175
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
176
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293."
177
            )
178
        ),
Jonas Waeber's avatar
Jonas Waeber committed
179
        IntegrationTestParams(
180
181
            3,
            "Sig Han 1293",
182
            "ttps://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
183
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
184
                "https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
185
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
186
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293."
187
            )
188
        ),
Jonas Waeber's avatar
Jonas Waeber committed
189
        IntegrationTestParams(
190
            4,
191
            "InputKey",
Jonas Waeber's avatar
Jonas Waeber committed
192
           "https://memobase.ch/record/test-record-set-id-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
193
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
194
                "https://memobase.ch/record/test-record-set-id-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
195
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
196
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-sigantur-example."
197
            )
Jonas Waeber's avatar
Jonas Waeber committed
198
        ),
Jonas Waeber's avatar
Jonas Waeber committed
199
        IntegrationTestParams(
200
201
            5,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
202
            "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
203
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
204
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
205
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
206
                "",
Jonas Waeber's avatar
Jonas Waeber committed
207
                Service.step
208
            )
Jonas Waeber's avatar
Jonas Waeber committed
209
        ),
Jonas Waeber's avatar
Jonas Waeber committed
210
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
211
212
            6,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
213
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
214
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
215
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
216
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
217
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
218
            )
Jonas Waeber's avatar
Jonas Waeber committed
219
        ),
Jonas Waeber's avatar
Jonas Waeber committed
220
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
221
222
            7,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
223
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
224
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
225
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
226
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
227
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
228
            )
229
        ),
Jonas Waeber's avatar
Jonas Waeber committed
230
        IntegrationTestParams(
231
232
            8,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
233
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
234
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
235
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
236
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
237
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
238
            )
Jonas Waeber's avatar
Jonas Waeber committed
239
        ),
Jonas Waeber's avatar
Jonas Waeber committed
240
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
241
242
            9,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
243
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
244
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
245
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
246
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
247
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
248
            )
249
        )*/
Jonas Waeber's avatar
Jonas Waeber committed
250
    )
Jonas Waeber's avatar
Jonas Waeber committed
251
}