Commit 517da906 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add error handling for JSON parser.

Add report for missing key json objects.
Add test for document search.
parent 6945f38c
Pipeline #17506 passed with stages
in 4 minutes and 59 seconds
......@@ -20,8 +20,6 @@ package org.memobase
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringWriter
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
......@@ -30,9 +28,11 @@ import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Default
import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS
import org.memobase.helpers.ReportStatus
import org.memobase.model.Report
import org.memobase.model.Schema
import org.memobase.settings.SettingsLoader
import java.io.StringWriter
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("SearchDocService")
......@@ -40,7 +40,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val reportTopic = settings.processReportTopic
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.SettingsProps.mediaUrl))
private val institutionSearchDoc = InstitutionSearchDocBuilder(settings.appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath))
private val institutionSearchDoc =
InstitutionSearchDocBuilder(settings.appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath))
private val jsonWriter = ObjectMapper().registerKotlinModule().writer()
......@@ -48,7 +49,8 @@ class KafkaTopology(private val settings: SettingsLoader) {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
val branchedStream = stream
.flatMapValues { value -> JSON.parse(value) }
.mapValues { value -> JSON.parse(value) }
.filter { _, value -> value.isNotEmpty() }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.record) },
......@@ -60,7 +62,10 @@ class KafkaTopology(private val settings: SettingsLoader) {
val recordStream = branchedStream[0]
.mapValues { readOnlyKey, value ->
try {
Pair(searchDocTransform.transform(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
Pair(
searchDocTransform.transform(value),
Report(readOnlyKey, "SUCCESS", "Transformed message into search doc.")
)
} catch (ex: InvalidInputException) {
Pair(Default.searchDoc, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
......@@ -71,13 +76,25 @@ class KafkaTopology(private val settings: SettingsLoader) {
val institutionStream = branchedStream[1]
.mapValues { readOnlyKey, value ->
try {
Pair(institutionSearchDoc.transform(readOnlyKey, value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
Pair(
institutionSearchDoc.transform(readOnlyKey, value),
Report(readOnlyKey, "SUCCESS", "Transformed message into search doc.")
)
} catch (ex: InvalidInputException) {
Pair(Default.institutionSearchDoc, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
}
outputStreams(institutionStream)
branchedStream[3]
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
"No record, memobase institution or record set present in input data: $value."
)
}
.to(reportTopic)
return builder.build()
}
......
......@@ -3,11 +3,15 @@ package org.memobase.helpers
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.logging.log4j.LogManager
import org.memobase.rdf.NS
import java.io.StringReader
object JSON {
private val log = LogManager.getLogger("JsonParser")
private const val graph = "@graph"
private const val atId = "@id"
const val atType = "@type" // rdf:type
......@@ -22,9 +26,13 @@ object JSON {
private val klaxon = Klaxon()
fun parse(data: String): List<JsonObject> {
val result = klaxon.parseJsonObject(StringReader(data))
return listOf(result)
fun parse(data: String): JsonObject {
return try {
klaxon.parseJsonObject(StringReader(data))
} catch (ex: KlaxonException) {
log.error("${ex.localizedMessage} -> $data.")
JsonObject()
}
}
fun unpack(input: JsonObject): Map<String, JsonObject> {
......
package org.memobase.helpers
object ReportStatus {
const val success = "SUCCESS"
const val ignored = "IGNORED"
const val warning = "WARNING"
const val fatal = "FATAL"
}
\ No newline at end of file
package org.memobase
import com.beust.klaxon.Klaxon
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.memobase.helpers.JSON
import java.io.File
import java.nio.charset.Charset
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestDocumentsSearchDoc {
private val dataPath = "src/test/resources/data/documents"
private fun readFile(fileName: String): String {
return File("$dataPath/$fileName").readText(Charset.defaultCharset())
}
private val klaxon = Klaxon()
private val transformer = SearchDocTransform("https://media.memobase.k8s.unibas.ch/memo/")
@Test
fun `test minimal record required`() {
val input = JSON.parse(readFile("minimal_record_required.json"))
val mappedInput = JSON.unpack(input)
val output = transformer.transform(
mappedInput
)
println(output)
}
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ class TestTransform {
val input = JSON.unpack(
JSON.parse(
FileInputStream(File("$dataPath/enrich_digital_metadata_record.json")).bufferedReader().readLines()
.reduce { acc, s -> acc + "\n" + s })[0]
.reduce { acc, s -> acc + "\n" + s })
)
val values = searchDoc.transform(input)
}
......
{
"@graph": [
{
"@id": "https://memobase.ch/record/TestIdentifier",
"@type": "https://www.ica.org/standards/RiC/ontology#Record"
}
]
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment