Unverified Commit c65d8a62 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

copy header from input message


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent cd69142f
......@@ -62,6 +62,9 @@ object App extends scala.App with Logging with RecordUtils {
val records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs)).asScala
records.foreach {
record => {
val headers = record.headers()
val recordSetHeader = headers.lastHeader("recordSetId")
val institutionHeader = headers.lastHeader("institutionId")
val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match {
case ProcessSuccess(id, resource, msg) =>
logger.info(s"$id: $msg")
......@@ -77,7 +80,7 @@ object App extends scala.App with Logging with RecordUtils {
logger.debug(ex.getStackTrace.mkString("\n"))
ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal)
})
reporter.send(reportingObject)
reporter.send(reportingObject, recordSetHeader, institutionHeader)
}
}
}
......
......@@ -19,12 +19,11 @@
package ch.memobase
import java.util.Properties
import java.util.concurrent.{Future => JavaFuture}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.header.Header
import java.util.Properties
import java.util.concurrent.{Future => JavaFuture}
import scala.collection.JavaConverters._
......@@ -38,27 +37,11 @@ class Reporter(props: Properties, reportingTopic: String) {
private val producer = new KafkaProducer[String, String](props)
def send(report: models.ReportingObject): JavaFuture[RecordMetadata] = {
// val headers = createHeaders(report.institutionRecordSet._2.getOrElse(""), report.institutionRecordSet._1.getOrElse(""))
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString)
def send(report: models.ReportingObject, recordSetHeader: Header, institutionHeader: Header): JavaFuture[RecordMetadata] = {
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString, List(recordSetHeader, institutionHeader).asJava)
producer.send(producerRecord)
}
private def createHeaders(recordSetId: String, institutionId: String): java.lang.Iterable[Header] = {
List(
new Header {
override def key(): String = "recordSetId"
override def value(): Array[Byte] = recordSetId.getBytes
},
new Header {
override def key(): String = "institutionId"
override def value(): Array[Byte] = institutionId.getBytes
}
).asJava
}
def close(): Unit = producer.close()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment