simplify report headers creation

parent 50a12ab2
Pipeline #19654 passed with stages
in 14 minutes and 17 seconds
...@@ -64,8 +64,6 @@ object App extends scala.App with Logging with RecordUtils { ...@@ -64,8 +64,6 @@ object App extends scala.App with Logging with RecordUtils {
records.foreach { records.foreach {
record => { record => {
val headers = record.headers() val headers = record.headers()
val recordSetId = Try(headers.lastHeader("recordSetId").value()).getOrElse(Array())
val institutionId = Try(headers.lastHeader("institutionId").value()).getOrElse(Array())
val reportingObject = recordProcessor.process(record).foldLeft( val reportingObject = recordProcessor.process(record).foldLeft(
ReportingObject(s"https://memobase.ch/record/${record.key()}"))((reportingObject, outcome) => outcome match { ReportingObject(s"https://memobase.ch/record/${record.key()}"))((reportingObject, outcome) => outcome match {
case ProcessSuccess(id, resource, msg) => case ProcessSuccess(id, resource, msg) =>
...@@ -82,7 +80,7 @@ object App extends scala.App with Logging with RecordUtils { ...@@ -82,7 +80,7 @@ object App extends scala.App with Logging with RecordUtils {
logger.debug(ex.getStackTrace.mkString("\n")) logger.debug(ex.getStackTrace.mkString("\n"))
ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal) ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal)
}) })
reporter.send(reportingObject, recordSetId, institutionId) reporter.send(reportingObject, headers)
} }
} }
} }
......
...@@ -20,12 +20,11 @@ ...@@ -20,12 +20,11 @@
package ch.memobase package ch.memobase
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.header.Header import org.apache.kafka.common.header.Headers
import org.apache.logging.log4j.scala.Logging import org.apache.logging.log4j.scala.Logging
import java.util.Properties import java.util.Properties
import java.util.concurrent.{Future => JavaFuture} import java.util.concurrent.{Future => JavaFuture}
import scala.collection.JavaConverters._
/** /**
...@@ -38,27 +37,18 @@ class Reporter(props: Properties, reportingTopic: String) extends Logging { ...@@ -38,27 +37,18 @@ class Reporter(props: Properties, reportingTopic: String) extends Logging {
private val producer = new KafkaProducer[String, String](props) private val producer = new KafkaProducer[String, String](props)
def send(report: models.ReportingObject, recordSetId: Array[Byte], institutionId: Array[Byte]): JavaFuture[RecordMetadata] = { //noinspection ScalaStyle
val headers = createHeaders(recordSetId, institutionId) def send(report: models.ReportingObject, headers: Headers): JavaFuture[RecordMetadata] = {
val producerRecord = new ProducerRecord[String, String](reportingTopic, null, report.id, report.toString, headers) val producerRecord = new ProducerRecord[String, String](
reportingTopic,
null,
report.id,
report.toString,
headers
)
producer.send(producerRecord) producer.send(producerRecord)
} }
private def createHeaders(recordSetId: Array[Byte], institutionId: Array[Byte]): java.lang.Iterable[Header] = {
List(
new Header {
override def key(): String = "recordSetId"
override def value(): Array[Byte] = recordSetId
},
new Header {
override def key(): String = "institutionId"
override def value(): Array[Byte] = institutionId
}
).asJava
}
def close(): Unit = producer.close() def close(): Unit = producer.close()
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment