TestPipeline.kt 4.06 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package org.memobase

import ch.memobase.reporting.Report
import ch.memobase.testing.EmbeddedSftpServer
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TestOutputTopic
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.TestRecord
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import java.io.File
import java.nio.charset.Charset

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestPipeline {
    private val resourcePath = "src/test/resources/data"
    private fun readFile(fileName: String, count: Int): String {
        return File("$resourcePath/$count/$fileName").readText(Charset.defaultCharset())
    }
    private val klaxon = Klaxon()

    private val regex = Regex("(_:B[A-Za-z0-9]+)")
    private val regexTime = Regex("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")

    private fun sort(source: String): String {
        return source.lines().map {
            var replacedString = it
            for (matchResult in regex.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "_:B")
            }
            for (matchResult in regexTime.findAll(it)) {
                replacedString = replacedString.replace(matchResult.groups[0]?.value.orEmpty(), "2020-10-10T09:10:22")
            }
            replacedString
        }.sorted().reduce { acc, s -> acc + "\n" + s }.trim()
    }

    private fun setupTest(
        fileName: String,
        count: Int,
        inputKey: String
    ): Pair<TestOutputTopic<String, String>, TestOutputTopic<String, String>> {
        val service = Service(fileName)
        val testDriver =
            TopologyTestDriver(KafkaTopology(service.settings).prepare().build(), service.settings.kafkaStreamsSettings)
        val inputValue = readFile("input.json", count)
        val headers = RecordHeaders()
        headers.add(RecordHeader("sessionId", "test-session-id".toByteArray()))
        headers.add(RecordHeader("recordSetId", "rst-001".toByteArray()))
        headers.add(RecordHeader("institutionId", "ins".toByteArray()))
        val inputTopic =
            testDriver.createInputTopic(service.settings.inputTopic, StringSerializer(), StringSerializer())
        val inputRecord = TestRecord<String, String>(inputKey, inputValue, headers)
        inputTopic.pipeInput(inputRecord)

        val configTopic = testDriver.createInputTopic(
            service.settings.appSettings.getProperty("configTopic"),
            StringSerializer(),
            StringSerializer()
        )
        configTopic.pipeInput("rst-001#mapping", readFile("mapping.yml", count))

        val outputTopic =
            testDriver.createOutputTopic(service.settings.outputTopic, StringDeserializer(), StringDeserializer())
        val outputReportTopic =
            testDriver.createOutputTopic(
                service.settings.processReportTopic,
                StringDeserializer(),
                StringDeserializer()
            )
        return Pair(outputTopic, outputReportTopic)
    }


    private fun parseReport(data: String): Report {
        return klaxon.parse<Report>(data)!!
    }


    @Test
    fun `test 1 minimal configuration`() {
        val topics = setupTest("app.yml", 1, "test-1")

        val record = topics.first.readRecord()
        val recordValue = record.value
        val recordKey = record.key

        assertAll(
            {
                assertThat(sort(recordValue))
                    .isEqualTo(sort(readFile("output.nt", 1)))
            },
            {
                assertThat(recordKey)
                    .isEqualTo("https://memobase.ch/record/rst-001-1")
            }

        )
    }

    @Test
    fun `test 2 minimal configuration with digital and physical object`() {

    }
}