ServiceTest.kt 6.16 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
 * fedora-ingest-service
 * Copyright (C) 2019  Memobase
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
package org.memobase

Thomas Bernhart's avatar
Thomas Bernhart committed
20
import java.io.File
21
import java.io.FileInputStream
Thomas Bernhart's avatar
Thomas Bernhart committed
22
import java.time.Duration
Thomas Bernhart's avatar
Thomas Bernhart committed
23
24
import java.util.Properties
import java.util.stream.Stream
25
26
27
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
Thomas Bernhart's avatar
Thomas Bernhart committed
28
import org.apache.kafka.clients.consumer.ConsumerRecord
29
30
31
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
Thomas Bernhart's avatar
Thomas Bernhart committed
32
import org.apache.kafka.clients.producer.ProducerRecord
33
34
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
35
import org.apache.logging.log4j.LogManager
Thomas Bernhart's avatar
Thomas Bernhart committed
36
import org.assertj.core.api.Assertions.assertThat
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.testing.EmbeddedKafkaExtension
import org.memobase.testing.EmbeddedSftpServer

@ExtendWith(EmbeddedKafkaExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ServiceTest {
    private val log = LogManager.getLogger("LocalTestsLogger")

    private val sftpServer = EmbeddedSftpServer(22000, "user", "password")

    private val adminClient =
        AdminClient.create(mapOf(Pair(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")))

    init {
55
        // setup embedded sftpServer with test data
56
        sftpServer.createDirectories(
57
            "/memobase/AFZ/BECKER/"
58
59
        )
        sftpServer.putFile(
60
61
            "/memobase/AFZ/BECKER/binary.txt",
            FileInputStream("src/integrationTest/resources/sftpData/binary.txt")
62
63
64
65
66
67
68
69
70
        )
    }

    private val inputProducer: KafkaProducer<String, String>
    private val reportConsumer: KafkaConsumer<String, String>

    init {
        val producerProps = Properties()
        producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
Thomas Bernhart's avatar
Thomas Bernhart committed
71
        producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "input-producer")
72
73
74
75
76
77
        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.qualifiedName)
        producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.qualifiedName)
        inputProducer = KafkaProducer(producerProps)

        val consumerProps = Properties()
        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
Thomas Bernhart's avatar
Thomas Bernhart committed
78
        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "report-consumer")
79
80
        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1")
        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
Jonas Waeber's avatar
Jonas Waeber committed
81
82
83
84
        consumerProps.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.qualifiedName
        )
85
86
        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        reportConsumer = KafkaConsumer(consumerProps)
Thomas Bernhart's avatar
Thomas Bernhart committed
87
        reportConsumer.subscribe(listOf("test-ingest-out-reporting"))
88
89
    }

Jonas Waeber's avatar
Jonas Waeber committed
90
    // TODO: Test works with local docker
91
    // docker run --rm -p 8080:8080 --name=fcrepo5 fcrepo/fcrepo:5.1.0
92
93
94
    @ParameterizedTest
    @MethodSource("kafkaTests")
    fun testRun(params: TestParams) {
Thomas Bernhart's avatar
Thomas Bernhart committed
95
96
97
98
99
100
        for (inputFileName in params.inputFileNames) {
            val inputFile = File("src/integrationTest/resources/${params.count}/$inputFileName")
            val id = inputFile.nameWithoutExtension
            inputProducer.send(
                    ProducerRecord(
                            "test-ingest-in",
101
                            "$id",
Thomas Bernhart's avatar
Thomas Bernhart committed
102
103
                            inputFile.readText()
                    )
Jonas Waeber's avatar
Jonas Waeber committed
104
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
105
106
        }
        val expectedRecordCount = params.expectedIngestReports.size
107

Jonas Waeber's avatar
Jonas Waeber committed
108
109
110
111
112
113
        val service = Service("test${params.count}.yml")
        var inputMessages = service.consumer.fetchRecords()
        while (inputMessages.isEmpty) {
            inputMessages = service.consumer.fetchRecords()
        }
        service.ingester.processRecords(inputMessages)
114
115

        val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
Thomas Bernhart's avatar
Thomas Bernhart committed
116

Jonas Waeber's avatar
Jonas Waeber committed
117
        var result = reportConsumer.poll(Duration.ofMillis(1000))
Thomas Bernhart's avatar
Thomas Bernhart committed
118
        while (totalConsumerRecords.size != expectedRecordCount) {
119
120
121
122
123
124
125
            if (result.count() > 0) {
                totalConsumerRecords.addAll(result.asIterable())
            }
            log.error(result.count())
            result = reportConsumer.poll(Duration.ofMillis(10))
        }

Jonas Waeber's avatar
Jonas Waeber committed
126
        assertThat(totalConsumerRecords)
Thomas Bernhart's avatar
Thomas Bernhart committed
127
            .size().isEqualTo(expectedRecordCount)
128
129
130
131
    }

    private fun kafkaTests() = Stream.of(
        TestParams(
Jonas Waeber's avatar
Jonas Waeber committed
132
            1,
Thomas Bernhart's avatar
Thomas Bernhart committed
133
            listOf(
Thomas Bernhart's avatar
Thomas Bernhart committed
134
                "AFZ-IB_Becker_Audiovisuals_63.nt"
Thomas Bernhart's avatar
Thomas Bernhart committed
135
            ),
Jonas Waeber's avatar
Jonas Waeber committed
136
137
            listOf(
                Report(
Thomas Bernhart's avatar
Thomas Bernhart committed
138
                    id = "AFZ-IB_Becker_Audiovisuals_63",
Jonas Waeber's avatar
Jonas Waeber committed
139
                    status = "SUCCESS",
Thomas Bernhart's avatar
Thomas Bernhart committed
140
                    message = "Ingested resource AFZ-IB_Becker_Audiovisuals_63."
Jonas Waeber's avatar
Jonas Waeber committed
141
                )
142
            )
143
144
145
146
147
148
149
150
151
152
153
154
155
156
        ),
        TestParams(
            2,
            listOf(
                "test-institution.nt"
            ),
            listOf(
                Report(
                    id = "test",
                    status = "SUCCESS",
                    message = "Ingested resource test."
                )
            )
    )
157
158
    )
}