Unverified Commit 792a99f6 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor count join


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 620607c9
......@@ -66,7 +66,7 @@ class KafkaTopology extends Logging {
.groupByKey
.count
.toStream
.join(aggregatedReports)((k, _) => k, (vl, vr) => vl >= vr)
.join(aggregatedReports)(mapKey, allRecordsProcessed)
.filter((_, v) => v)
.map((k, _) => (k, ServiceTermination(getProcessId(k), getJobId(k)).toString))
.to(SettingsFromFile.getKafkaOutputTopic)
......@@ -86,4 +86,15 @@ class KafkaTopology extends Logging {
logger.warn("Ignoring message since count is no valid value")
List()
}
private def mapKey(k: String, _v: Long): String = k
private def allRecordsProcessed(leftValue: Long, rightValue: Long): Boolean = {
if (leftValue >= rightValue) {
logger.info(s"""$leftValue of $rightValue processed => all records processed""")
} else {
logger.debug(s"""$leftValue of $rightValue processed => still ${rightValue - leftValue} records to be processed""")
}
leftValue >= rightValue
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment