Commit 395db673 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Fix filter pipeline.

parent 147f4343
variables:
APPLICATION_ID: "client"
KAFKA_BOOTSTRAP_SERVERS: "localhost:1234"
TOPIC_IN: "input"
TOPIC_OUT_AV: "output_av"
TOPIC_OUT_IMAGE: "output_image"
TOPIC_OUT_IGNORE: "output_ignore"
TOPIC_PROCESS: "output_reports"
REPORTING_STEP_NAME: "step"
stages:
- test
- build
......
......@@ -74,7 +74,6 @@ class KafkaTopology extends Logging {
) =
stream
.mapValues(v => setMediaPlayerType(v))
.mapValues(v => (v._1, filterEnrichableResources(v._2)))
.branch(
(_, v) => v._2.forall(v => v.enrichable && v.player == ImageViewer),
(_, v) => v._2.forall(_.enrichable),
......
<https://memobase.ch/record/test> <https://www.ica.org/standards/RiC/ontology#type> "Radio"
<https://memobase.ch/digital/test-1> <https://www.ica.org/standards/RiC/ontology#type> "digitalObject"
<https://memobase.ch/digital/test-1> <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#locator> "http://tp.srgssr.ch/p/srf/inline?urn=urn:rts:audio:3293771"
\ No newline at end of file
<https://memobase.ch/record/test> <https://www.ica.org/standards/RiC/ontology#type> "Radio"
<https://memobase.ch/digital/test-1> <https://www.ica.org/standards/RiC/ontology#type> "digitalObject"
<https://memobase.ch/digital/test-1> <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#locator> "http://tp.srgssr.ch/p/srf/inline?urn=urn:rts:audio:3293771"
<https://memobase.ch/digital/test-1> <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#isDistributedOn> "srfaudio" .
......@@ -19,49 +19,127 @@
package ch.memobase
import java.util.Properties
import org.apache.kafka.clients.producer.ProducerRecord
import ch.memobase.models.{ProcessingIgnore, ProcessingStatus, ReportingObject}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.test._
import org.apache.kafka.streams.{StreamsConfig, Topology, TopologyTestDriver}
import org.apache.kafka.streams.{TestInputTopic, TestOutputTopic, Topology, TopologyTestDriver}
import org.scalatest.funsuite.AnyFunSuite
class KafkaTopologyTest extends AnyFunSuite {
import Serdes._
import java.io.File
import scala.io.Source
class KafkaTopologyTest extends AnyFunSuite {
private val fixture = new {
private val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
p
}
val factory = new ConsumerRecordFactory[String, String](new StringSerializer, new StringSerializer)
val testDriver: Topology => TopologyTestDriver = t => new TopologyTestDriver(t, props)
val inputTopic = "input"
val outputTopic = "output"
val testDriver: Topology => TopologyTestDriver = t =>
new TopologyTestDriver(t, SettingsFromFile.kafkaStreamsSettings)
}
def getOutput(ttd: TopologyTestDriver): ProducerRecord[String, String] = {
ttd.readOutput(outputTopic, new StringDeserializer, new StringDeserializer)
}
private def replaceTimestamp(value: String): String = {
value.replaceAll("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}", "2020")
}
test("enrich record with analyzable media object successfully") {
private def readFile(name: String): String = {
Source.fromFile(new File(s"src/test/resources/topology/$name.nt")).mkString
}
private def createInputTopic(
testDrive: TopologyTestDriver
): TestInputTopic[String, String] = {
testDrive.createInputTopic(
SettingsFromFile.inputTopic,
new StringSerializer(),
new StringSerializer()
)
}
val builder: StreamsBuilder = new StreamsBuilder
builder.stream[String, String](fixture.inputTopic)
.filter((_, v) => v.nonEmpty)
.to(fixture.outputTopic)
private def createOutputTopicAV(
testDrive: TopologyTestDriver
): TestOutputTopic[String, String] = {
testDrive.createOutputTopic(
SettingsFromFile.outputTopicAV,
new StringDeserializer(),
new StringDeserializer()
)
}
private def createOutputTopicImage(
testDrive: TopologyTestDriver
): TestOutputTopic[String, String] = {
testDrive.createOutputTopic(
SettingsFromFile.outputTopicImage,
new StringDeserializer(),
new StringDeserializer()
)
}
private def createOutputTopicIgnored(
testDrive: TopologyTestDriver
): TestOutputTopic[String, String] = {
testDrive.createOutputTopic(
SettingsFromFile.outputTopicIgnore,
new StringDeserializer(),
new StringDeserializer()
)
}
val tD = fixture.testDriver(builder.build())
tD.pipeInput(fixture.factory.create(fixture.inputTopic, "key", "value"))
private def createReportsTopic(
testDrive: TopologyTestDriver
): TestOutputTopic[String, String] = {
testDrive.createOutputTopic(
SettingsFromFile.reportTopic,
new StringDeserializer(),
new StringDeserializer()
)
}
OutputVerifier.compareKeyValue(fixture.getOutput(tD), "key", "value")
private def createExpectedReport(
key: String,
processingStatus: ProcessingStatus,
message: String
): String = {
replaceTimestamp(
ReportingObject(
key,
processingStatus,
message
).toString
)
}
test("ignore missing content") {
val builder = new KafkaTopology
val tD = fixture.testDriver(builder.build)
val inputTopic = createInputTopic(tD)
inputTopic.pipeInput("", "")
val reportingTopic = createReportsTopic(tD)
val report = reportingTopic.readRecord()
val expectedReport = createExpectedReport(
"",
ProcessingIgnore,
"Record has no locator. Assuming there is no attached media file."
)
assertResult("")(report.key())
assertResult(expectedReport)(replaceTimestamp(report.value()))
tD.close()
}
test("srfaudio input") {
val builder = new KafkaTopology
val tD = fixture.testDriver(builder.build)
val inputTopic = createInputTopic(tD)
inputTopic.pipeInput("test", readFile("input"))
val outputTopicAV = createOutputTopicIgnored(tD)
val record = outputTopicAV.readRecord()
val reportingTopic = createReportsTopic(tD)
val report = reportingTopic.readRecord()
val expectedReport = createExpectedReport(
"test",
ProcessingIgnore,
"Record is not enrichable; ebucore:isDistributedOn set to `srfaudio` for https://memobase.ch/digital/test-1"
)
assertResult("test")(report.key())
assertResult(expectedReport)(replaceTimestamp(report.value()))
assertResult("test")(record.key())
assertResult(readFile("output"))(record.value())
tD.close()
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment