Unverified Commit 620607c9 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor string to long conversion

parent 1417c7bb
......@@ -47,12 +47,8 @@ class KafkaTopology extends Logging {
val dataSetSizeRegistry = builder.stream[String, String](SettingsFromFile.getCountInputTopic)
dataSetSizeRegistry.flatMap((k, v) => Try(v.toLong) match {
case Success(l) => List((k, l))
case _ =>
logger.warn("Ignoring message since count is no valid value")
List()
})
dataSetSizeRegistry
.flatMap(convertToLongValue)
.to(intermediaryDataSetCountTopic)
val aggregatedReports = builder
......@@ -81,4 +77,13 @@ class KafkaTopology extends Logging {
private def getProcessId(s: String) = s.split("-")(0)
private def getJobId(s: String) = s.split("-")(1)
private def convertToLongValue(k: String, v: String): List[(String, Long)] = Try(v.toLong) match {
case Success(l) =>
logger.info(s"Receiving new request to register dataset count: $k -> $v")
List((k, l))
case _ =>
logger.warn("Ignoring message since count is no valid value")
List()
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment