fix header nullpointerexception

parent 8be2ff6a
Pipeline #19288 passed with stages
in 8 minutes and 5 seconds
......@@ -63,10 +63,9 @@ object App extends scala.App with Logging with RecordUtils {
val records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs)).asScala
records.foreach {
record => {
logger.info(s"Build record ${record.key()}") // TODO: Remove
val headers = record.headers()
val recordSetHeader = Try(headers.lastHeader("recordSetId")).toOption
val institutionHeader = Try(headers.lastHeader("institutionId")).toOption
val recordSetId = Try(headers.lastHeader("recordSetId").value()).getOrElse(Array())
val institutionId = Try(headers.lastHeader("institutionId").value()).getOrElse(Array())
val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match {
case ProcessSuccess(id, resource, msg) =>
logger.info(s"$id: $msg")
......@@ -82,8 +81,7 @@ object App extends scala.App with Logging with RecordUtils {
logger.debug(ex.getStackTrace.mkString("\n"))
ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal)
})
reporter.send(reportingObject, recordSetHeader, institutionHeader)
logger.info(s"Record ${record.key()} sent") // TODO: Remove
reporter.send(reportingObject, recordSetId, institutionId)
}
}
}
......
......@@ -38,13 +38,28 @@ class Reporter(props: Properties, reportingTopic: String) extends Logging {
private val producer = new KafkaProducer[String, String](props)
def send(report: models.ReportingObject, recordSetHeader: Option[Header], institutionHeader: Option[Header]): JavaFuture[RecordMetadata] = {
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString,
List(recordSetHeader, institutionHeader).flatten.asJava)
def send(report: models.ReportingObject, recordSetId: Array[Byte], institutionId: Array[Byte]): JavaFuture[RecordMetadata] = {
val headers = createHeaders(recordSetId, institutionId)
val producerRecord = new ProducerRecord[String, String](reportingTopic, null, report.id, report.toString, headers)
logger.info(s"Sending report: $report")
producer.send(producerRecord)
}
private def createHeaders(recordSetId: Array[Byte], institutionId: Array[Byte]): java.lang.Iterable[Header] = {
List(
new Header {
override def key(): String = "recordSetId"
override def value(): Array[Byte] = recordSetId
},
new Header {
override def key(): String = "institutionId"
override def value(): Array[Byte] = institutionId
}
).asJava
}
def close(): Unit = producer.close()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment