Unverified Commit 56747403 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor uri creation for reports

parent 41f0728d
......@@ -32,10 +32,10 @@ class KafkaTopology extends Logging {
import Serdes._
def build(
topicIn: String,
topicOut: String,
reportingTopic: String
): Topology = {
topicIn: String,
topicOut: String,
reportingTopic: String
): Topology = {
val builder = new StreamsBuilder
val manifestBuilder = new Manifest
......@@ -78,23 +78,25 @@ class KafkaTopology extends Logging {
builder.build()
}
private def sendRecord(stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicOut: String): Unit = {
private def sendRecord(
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicOut: String
): Unit = {
stream
.map((_, v) => (extractId(v.get.obj._1), v.get.obj._2))
.to(topicOut)
}
private def reportManifestCreationWarnings(
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicReport: String
): Unit =
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicReport: String
): Unit =
stream
.map((k, v) =>
(
s"https://memobase.ch/record/$k",
uri(k),
ReportingObject(
s"https://memobase.ch/record/$k",
uri(k),
ProcessingWarning,
v.get.warnings.mkString("\n")
).toString
......@@ -103,15 +105,15 @@ class KafkaTopology extends Logging {
.to(topicReport)
private def reportSuccessfulManifestCreation(
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicReport: String
): Unit =
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicReport: String
): Unit =
stream
.map((k, _) =>
(
s"https://memobase.ch/record/$k",
uri(k),
ReportingObject(
s"https://memobase.ch/record/$k",
uri(k),
ProcessingSuccess,
"IIIF manifest successfully created"
).toString
......@@ -120,15 +122,15 @@ class KafkaTopology extends Logging {
.to(topicReport)
private def reportManifestCreationFailure(
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicReport: String
): Unit =
stream: KStream[String, Try[ExtractionResult[(String, String)]]],
topicReport: String
): Unit =
stream
.map((k, v) =>
(
s"https://memobase.ch/record/$k",
uri(k),
ReportingObject(
s"https://memobase.ch/record/$k",
uri(k),
ProcessingFatal,
s"Error creating manifest: ${v.failed.get.getMessage}"
).toString
......@@ -137,16 +139,16 @@ class KafkaTopology extends Logging {
.to(topicReport)
private def reportIgnoredRecord(
stream: KStream[String, String],
topicReport: String,
message: String
): Unit =
stream: KStream[String, String],
topicReport: String,
message: String
): Unit =
stream
.map((k, _) =>
(
s"https://memobase.ch/record/$k",
uri(k),
ReportingObject(
s"https://memobase.ch/record/$k",
uri(k),
ProcessingIgnore,
message
).toString
......@@ -154,4 +156,5 @@ class KafkaTopology extends Logging {
)
.to(topicReport)
private val uri: String => String = id => s"https://memobase.ch/record/$id"
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment