IntegrationTests.kt 9.3 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

Jonas Waeber's avatar
Jonas Waeber committed
19
20
import ch.memobase.rdf.NS
import ch.memobase.reporting.Report
Jonas Waeber's avatar
Jonas Waeber committed
21
import ch.memobase.reporting.ReportStatus
22
import com.beust.klaxon.Klaxon
Jonas Waeber's avatar
Jonas Waeber committed
23
24
25
26
27
28
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
Jonas Waeber's avatar
Jonas Waeber committed
29
30
31
32
33
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.assertj.core.api.Assertions.assertThat
Jonas Waeber's avatar
Jonas Waeber committed
34
import org.junit.jupiter.api.TestInstance
35
import org.junit.jupiter.api.assertAll
Jonas Waeber's avatar
Jonas Waeber committed
36
37
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Jonas Waeber's avatar
Jonas Waeber committed
38
import org.memobase.params.IntegrationTestParams
Jonas Waeber's avatar
Jonas Waeber committed
39
import java.io.File
Jonas Waeber's avatar
Jonas Waeber committed
40
import java.io.FileOutputStream
Jonas Waeber's avatar
Jonas Waeber committed
41
import java.nio.charset.Charset
Jonas Waeber's avatar
Jonas Waeber committed
42
import java.util.stream.Stream
Jonas Waeber's avatar
Jonas Waeber committed
43
44

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
Jonas Waeber's avatar
Jonas Waeber committed
45
class IntegrationTests {
Jonas Waeber's avatar
Jonas Waeber committed
46

Jonas Waeber's avatar
Jonas Waeber committed
47
48
    private val klaxon = Klaxon()

Jonas Waeber's avatar
Jonas Waeber committed
49
    private val resourcePath = "src/test/resources"
Jonas Waeber's avatar
Jonas Waeber committed
50
51
52
    private fun readFile(fileName: String): String {
        return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
    }
Jonas Waeber's avatar
Jonas Waeber committed
53

Jonas Waeber's avatar
Jonas Waeber committed
54
    private val regex = Regex("(_:B[A-Za-z0-9]+)")
Jonas Waeber's avatar
Jonas Waeber committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")

    private fun sort(source: List<String>): String {
        return source.map {
            var replacedString = it
            for (matchResult in regex.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
            }
            for (matchResult in regexTime.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
            }
            replacedString
        }.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
    }
Jonas Waeber's avatar
Jonas Waeber committed
69
70
71

    @ParameterizedTest
    @MethodSource("kafkaTests")
Jonas Waeber's avatar
Jonas Waeber committed
72
    fun `test kafka topology`(params: IntegrationTestParams) {
73
        val service = Service("kafkaTest${params.count}.yml")
Jonas Waeber's avatar
Jonas Waeber committed
74
        val output = readFile("kafkaTests/${params.count}/output.nt").lines()
75
        val settings = service.settings
Jonas Waeber's avatar
Jonas Waeber committed
76
77

        val headers = RecordHeaders()
Jonas Waeber's avatar
Jonas Waeber committed
78
79
80
        headers.add(RecordHeader("sessionId", "1".toByteArray()))
        headers.add(RecordHeader("recordSetId", "ati-002".toByteArray()))
        headers.add(RecordHeader("institutionId", "ati".toByteArray()))
Jonas Waeber's avatar
Jonas Waeber committed
81
82
83
84
85
86
87
88
89
        headers.add(RecordHeader("isPublished", "false".toByteArray()))
        headers.add(RecordHeader("xmlRecordTag", "record".toByteArray()))
        headers.add(RecordHeader("xmlIdentifierFieldName", "identifierMain".toByteArray()))
        headers.add(RecordHeader("tableSheetIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderCount", "1".toByteArray()))
        headers.add(RecordHeader("tableHeaderIndex", "1".toByteArray()))
        headers.add(RecordHeader("tableIdentifierIndex", "1".toByteArray()))


Jonas Waeber's avatar
Jonas Waeber committed
90
        val testDriver = TopologyTestDriver(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
Jonas Waeber's avatar
Jonas Waeber committed
91
92
93
        val factory = ConsumerRecordFactory(
            StringSerializer(), StringSerializer()
        )
Jonas Waeber's avatar
Jonas Waeber committed
94
95
96
97
98


        testDriver.pipeInput(
            factory.create(
                "import-process-config",
Jonas Waeber's avatar
Jonas Waeber committed
99
                "ati-002#mapping",
Jonas Waeber's avatar
Jonas Waeber committed
100
101
102
103
                readFile("kafkaTests/${params.count}/config/mapping.yml")
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
104
105
        testDriver.pipeInput(
            factory.create(
Jonas Waeber's avatar
Jonas Waeber committed
106
                settings.inputTopic, params.key, readFile("kafkaTests/${params.count}/input.json"), headers
Jonas Waeber's avatar
Jonas Waeber committed
107
108
109
            )
        )

Jonas Waeber's avatar
Jonas Waeber committed
110
111
112
113
114
115
116
117
118
119
120
121
        val reportRecord = testDriver.readOutput(
            service.settings.processReportTopic,
            StringDeserializer(),
            StringDeserializer()
        )
        val reportString = reportRecord.value()
        val report = klaxon.parse<Report>(reportString)!!
        if (report.status == ReportStatus.fatal) {
            assertThat(report)
                .isEqualTo(params.report)
            return
        }
Jonas Waeber's avatar
Jonas Waeber committed
122
123

        val record = testDriver.readOutput(
Jonas Waeber's avatar
Jonas Waeber committed
124
125
126
127
128
            settings.outputTopic,
            StringDeserializer(),
            StringDeserializer()
        )

Jonas Waeber's avatar
Jonas Waeber committed
129
130
131
132
133
134
135
136
137
138
        val model = ModelFactory.createDefaultModel()
        NS.prefixMapping.map {
            model.setNsPrefix(it.key, it.value)
        }
        RDFDataMgr.read(model, record.value().byteInputStream(), Lang.NTRIPLES)
        RDFDataMgr.write(
            FileOutputStream("$resourcePath/kafkaTests/${params.count}/turtle-output.ttl"),
            model,
            RDFFormat.TURTLE_PRETTY
        )
Jonas Waeber's avatar
Jonas Waeber committed
139

140
        val key = record.key()
Jonas Waeber's avatar
Jonas Waeber committed
141
        val sortedResult = sort(record.value().lines()).trim()
Jonas Waeber's avatar
Jonas Waeber committed
142

143
144
145
146
147
148
149
150
151
152
153
154
        assertAll("",
            { assertThat(key).isEqualTo(params.expectedKey) },
            {
                assertThat(sortedResult)
                    .isEqualTo(sort(output).trim())
            },
            {
                assertThat(Klaxon().parse<Report>(reportString))
                    .isEqualTo(params.report)
            }

        )
Jonas Waeber's avatar
Jonas Waeber committed
155
    }
Jonas Waeber's avatar
Jonas Waeber committed
156

Jonas Waeber's avatar
Jonas Waeber committed
157
    private fun kafkaTests() = Stream.of(
Jonas Waeber's avatar
Jonas Waeber committed
158
        IntegrationTestParams(
159
160
             1,
             "MEI_49884",
Jonas Waeber's avatar
Jonas Waeber committed
161
             "https://memobase.ch/record/test-record-set-id-MEI_49884",
162
             Report(
Jonas Waeber's avatar
Jonas Waeber committed
163
164
165
166
                 "MEI_49884",
                 "FATAL",
                 "No Record Id Found: Found multiple values in the field 'identifierOriginal' for identifiers: 22861, 22861, 22861.",
                 Service.step
167
168
             )

Jonas Waeber's avatar
Jonas Waeber committed
169
         )/*,
Jonas Waeber's avatar
Jonas Waeber committed
170
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
171
172
            2,
            "Sig Han 1293",
Jonas Waeber's avatar
Jonas Waeber committed
173
            "https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
174
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
175
                "https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
176
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
177
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293."
178
            )
179
        ),
Jonas Waeber's avatar
Jonas Waeber committed
180
        IntegrationTestParams(
181
182
            3,
            "Sig Han 1293",
183
            "ttps://memobase.ch/record/TEST_RECORD_SET-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
184
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
185
                "https://memobase.ch/record/test-record-set-id-Sig_Han_1293",
Jonas Waeber's avatar
Jonas Waeber committed
186
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
187
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-Sig_Han_1293."
188
            )
189
        ),
Jonas Waeber's avatar
Jonas Waeber committed
190
        IntegrationTestParams(
191
            4,
192
            "InputKey",
Jonas Waeber's avatar
Jonas Waeber committed
193
           "https://memobase.ch/record/test-record-set-id-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
194
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
195
                "https://memobase.ch/record/test-record-set-id-sigantur-example",
Jonas Waeber's avatar
Jonas Waeber committed
196
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
197
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-sigantur-example."
198
            )
Jonas Waeber's avatar
Jonas Waeber committed
199
        ),
Jonas Waeber's avatar
Jonas Waeber committed
200
        IntegrationTestParams(
201
202
            5,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
203
            "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
204
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
205
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
206
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
207
                "",
Jonas Waeber's avatar
Jonas Waeber committed
208
                Service.step
209
            )
Jonas Waeber's avatar
Jonas Waeber committed
210
        ),
Jonas Waeber's avatar
Jonas Waeber committed
211
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
212
213
            6,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
214
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
215
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
216
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
217
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
218
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
219
            )
Jonas Waeber's avatar
Jonas Waeber committed
220
        ),
Jonas Waeber's avatar
Jonas Waeber committed
221
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
222
223
            7,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
224
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
225
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
226
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
227
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
228
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
229
            )
230
        ),
Jonas Waeber's avatar
Jonas Waeber committed
231
        IntegrationTestParams(
232
233
            8,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
234
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
235
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
236
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
237
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
238
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
239
            )
Jonas Waeber's avatar
Jonas Waeber committed
240
        ),
Jonas Waeber's avatar
Jonas Waeber committed
241
        IntegrationTestParams(
Jonas Waeber's avatar
Jonas Waeber committed
242
243
            9,
            "ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
244
           "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
245
            Report(
Jonas Waeber's avatar
Jonas Waeber committed
246
                "https://memobase.ch/record/test-record-set-id-ID_1",
Jonas Waeber's avatar
Jonas Waeber committed
247
                "SUCCESS",
Jonas Waeber's avatar
Jonas Waeber committed
248
                "Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
Jonas Waeber's avatar
Jonas Waeber committed
249
            )
250
        )*/
Jonas Waeber's avatar
Jonas Waeber committed
251
    )
Jonas Waeber's avatar
Jonas Waeber committed
252
}