Commit 86ec81ee authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fix tests

They will now always complete.
parent 18729b65
......@@ -25,8 +25,8 @@ import java.util.stream.Stream
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
......@@ -69,6 +69,7 @@ class Tests {
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.qualifiedName)
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumer = KafkaConsumer(props)
consumer.subscribe(listOf("sftp-reader-p1-j1", "sftp-reader-p1-j1-reporting", "p1-reporting"))
}
@ParameterizedTest
......@@ -81,47 +82,31 @@ class Tests {
val reportingTopic = service.settings.outputTopic + "-reporting"
val processReportingTopic = service.settings.processReportTopic
consumer.assign(listOf(TopicPartition(topic, 0)))
val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
var result = consumer.poll(Duration.ofMillis(10))
while (result.count() == 0) {
while (totalConsumerRecords.size != 3) {
if (result.count() > 0) {
totalConsumerRecords.addAll(result.asIterable())
}
log.error(result.count())
result = consumer.poll(Duration.ofMillis(10))
}
consumer.assign(listOf(TopicPartition(reportingTopic, 0)))
var resultReport = consumer.poll(Duration.ofMillis(10))
while (resultReport.count() == 0) {
resultReport = consumer.poll(Duration.ofMillis(10))
}
consumer.assign(listOf(TopicPartition(processReportingTopic, 0)))
var processReportResult = consumer.poll(Duration.ofMillis(10))
while (processReportResult.count() == 0) {
processReportResult = consumer.poll(Duration.ofMillis(10))
}
assertThat(result.records(topic))
.describedAs("Record Results")
.hasSize(1)
.first()
assertThat(totalConsumerRecords.find { value -> value.topic() == topic })
.describedAs("Message Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedValue)
assertThat(resultReport.records(reportingTopic))
.describedAs("Report Results")
.hasSize(1)
.first()
assertThat(totalConsumerRecords.find { value -> value.topic() == reportingTopic })
.describedAs("Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedKey)
.hasFieldOrPropertyWithValue("value", params.expectedReportValue)
assertThat(processReportResult.records(processReportingTopic))
.describedAs("Report Process Results")
.hasSize(1)
.first()
assertThat(totalConsumerRecords.find { value -> value.topic() == processReportingTopic })
.describedAs("Process Report Test")
.hasFieldOrPropertyWithValue("key", params.expectedProcessReport.id)
.hasFieldOrPropertyWithValue("value", Klaxon().toJsonString(params.expectedProcessReport))
// cleanup inside of class because there is no way to access topics outside of this function.
adminClient.deleteTopics(listOf(topic, reportingTopic, processReportingTopic))
}
private fun directoryReaderTests() = Stream.of(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment