Unverified Commit e42db077 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

catch nullPointerExceptions


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent ec53f827
Pipeline #19281 passed with stages
in 6 minutes and 47 seconds
...@@ -26,6 +26,7 @@ import org.memobase.settings.SettingsLoader ...@@ -26,6 +26,7 @@ import org.memobase.settings.SettingsLoader
import java.time.Duration import java.time.Duration
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try
object App extends scala.App with Logging with RecordUtils { object App extends scala.App with Logging with RecordUtils {
...@@ -63,8 +64,8 @@ object App extends scala.App with Logging with RecordUtils { ...@@ -63,8 +64,8 @@ object App extends scala.App with Logging with RecordUtils {
records.foreach { records.foreach {
record => { record => {
val headers = record.headers() val headers = record.headers()
val recordSetHeader = headers.lastHeader("recordSetId") val recordSetHeader = Try(headers.lastHeader("recordSetId")).toOption
val institutionHeader = headers.lastHeader("institutionId") val institutionHeader = Try(headers.lastHeader("institutionId")).toOption
val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match { val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match {
case ProcessSuccess(id, resource, msg) => case ProcessSuccess(id, resource, msg) =>
logger.info(s"$id: $msg") logger.info(s"$id: $msg")
......
...@@ -38,8 +38,9 @@ class Reporter(props: Properties, reportingTopic: String) extends Logging { ...@@ -38,8 +38,9 @@ class Reporter(props: Properties, reportingTopic: String) extends Logging {
private val producer = new KafkaProducer[String, String](props) private val producer = new KafkaProducer[String, String](props)
def send(report: models.ReportingObject, recordSetHeader: Header, institutionHeader: Header): JavaFuture[RecordMetadata] = { def send(report: models.ReportingObject, recordSetHeader: Option[Header], institutionHeader: Option[Header]): JavaFuture[RecordMetadata] = {
val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString, List(recordSetHeader, institutionHeader).asJava) val producerRecord = new ProducerRecord[String, String](reportingTopic, 0, report.id, report.toString,
List(recordSetHeader, institutionHeader).flatten.asJava)
logger.info(s"Sending report: $report") logger.info(s"Sending report: $report")
producer.send(producerRecord) producer.send(producerRecord)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment