Commit 2c0e61dc authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

MEMO-774: Adapt producer and consumer for new import workflow

parent ff4a52cc
......@@ -30,6 +30,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
......@@ -84,7 +86,7 @@ class ServiceTest {
)
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
reportConsumer = KafkaConsumer(consumerProps)
reportConsumer.subscribe(listOf("test-ingest-out-reporting"))
reportConsumer.subscribe(listOf("test-ingest-reporting"))
}
// TODO: Test works with local docker
......@@ -98,8 +100,10 @@ class ServiceTest {
inputProducer.send(
ProducerRecord(
"test-ingest-in",
null,
"https://memobase.ch/record/$id",
inputFile.readText()
inputFile.readText(),
params.headers
)
)
}
......@@ -125,6 +129,12 @@ class ServiceTest {
private fun kafkaTests() = Stream.of(
TestParams(
1,
listOf(
RecordHeader("recordSetId", "AFZ-IB_Becker_Audiovisuals".toByteArray()),
RecordHeader("sessionId", "long-session-id-1234567890".toByteArray()),
RecordHeader("shortSessionId", "1234567890".toByteArray()),
RecordHeader("institutionId", "AFZ".toByteArray())
),
listOf(
"AFZ-IB_Becker_Audiovisuals_63.nt"
),
......
......@@ -18,8 +18,11 @@
package org.memobase
import org.apache.kafka.common.header.internals.RecordHeader
data class TestParams(
val count: Int,
val headers: List<RecordHeader>,
val inputFileNames: List<String>,
val expectedIngestReports: List<Report>
)
......@@ -21,13 +21,16 @@ import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
class Producer(props: Properties, topic: String) : Closeable {
class Producer(
props: Properties,
private val reportingTopic: String
) : Closeable {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
fun sendReport(report: Report) {
instance.send(ProducerRecord(reportingTopic, report.id, report.toJson()))
fun sendReport(headers: Headers, report: Report) {
instance.send(ProducerRecord(reportingTopic, null, report.id, report.toJson(), headers))
}
override fun close() {
......
......@@ -64,7 +64,7 @@ class Service(fileName: String = "app.yml") : Closeable {
consumerSettings.setProperty("max.poll.records", CONSUMER_MAX_POLL_RECORDS)
consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
consumer = Consumer(consumerSettings, settings.inputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
log.info("Connected to Kafka.")
}
......@@ -77,7 +77,7 @@ class Service(fileName: String = "app.yml") : Closeable {
fun processRecords() {
for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
producer.sendReport(record.headers(), ingestReport)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment