Commit 236038de authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Added reporting to mapper service.

parent 2860c896
Pipeline #10375 passed with stages
in 9 minutes and 29 seconds
......@@ -22,29 +22,118 @@ import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.builder.ResourceBuilder
import org.memobase.mapping.MappingConfig
import org.memobase.mapping.exceptions.InvalidMappingException
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader
class KafkaTopology(
private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("KafkaTopology")
private val config = MappingConfig(settings.appSettings.getProperty("configs"))
private val reportTopic = settings.outputTopic + "-reporting"
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
stream.flatMapValues { value -> parseJsonFile(value) }
.flatMapValues { value -> buildResources(value) }
.flatMap { _, value -> writeResource(value) }
.to(settings.outputTopic)
val extractedRecordIdStream = stream.flatMapValues { value -> parseJsonFile(value) }
.mapValues { value -> buildResources(value) }
.mapValues { value -> value.extractRecordId() }
val extractedRecordTypeValueStream = extractRecordId(extractedRecordIdStream)
val hasIdAndTypeStream = extractRecordTypeValue(extractedRecordTypeValueStream)
val completedMappingStream = hasIdAndTypeStream
.mapValues { value -> value.generateRecord() }
.mapValues { value -> value.generatePhysicalObject() }
.mapValues { value -> value.generateDigitalObject() }
.mapValues { value -> value.addDerivedConnection() }
val recordStream = completedMappingStream.mapValues { value -> value.writeRecord() }
val physicalObjectStream = completedMappingStream
.filter { _, value -> value.hasPhysicalObject() }
.mapValues { value -> value.writePhysicalObject() }
val digitalObjectStream = completedMappingStream
.filter { _, value -> value.hasDigitalObject() }
.mapValues { value -> value.writeDigitalObject() }
objectOutput(recordStream)
objectOutput(physicalObjectStream)
objectOutput(digitalObjectStream)
return builder.build()
}
private fun objectOutput(stream: KStream<String, Pair<KeyValue<String, String>, Report>>) {
stream
.map { _, value -> value.first }
.to(settings.outputTopic)
stream
.map { _, value -> KeyValue(value.first.key, value.second.toJson()) }
.to(reportTopic)
}
private fun buildResources(value: Map<String, String>): ResourceBuilder {
return ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
config.digitalObjectFieldMappers,
value,
settings.appSettings.getProperty("institutionId"),
settings.appSettings.getProperty("recordSetId")
)
}
private fun extractRecordId(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
val hasRecordId = stream.branch(
Predicate { _, value -> value.hasRecordId() },
Predicate { _, _ -> true }
)
// early termination if there is no record id!
val noRecordId = hasRecordId[1]
noRecordId
.mapValues { _ -> "ERROR: No record id specified!" }
.to(settings.outputTopic)
noRecordId
.mapValues { key, value -> value.failureReport(key).toJson() }
.to(reportTopic)
return hasRecordId[0]
.mapValues { value -> value.extractRecordTypeValue() }
}
private fun extractRecordTypeValue(stream: KStream<String, ResourceBuilder>): KStream<String, ResourceBuilder> {
val hasRecordTypeValue = stream
.branch(
Predicate { _, value -> value.hasRecordType() },
Predicate { _, _ -> true }
)
val noRecordTypeValue = hasRecordTypeValue[1]
noRecordTypeValue
.mapValues { _ -> "ERROR: No record type value specified!" }
.to(settings.outputTopic)
noRecordTypeValue
.mapValues { key, value -> value.failureReport(key).toJson() }
.to(reportTopic)
return hasRecordTypeValue[0]
}
private fun parseJsonFile(message: String): List<Map<String, String>> {
Klaxon().parse<Map<String, String>>(message).let {
......@@ -56,25 +145,5 @@ class KafkaTopology(private val settings: SettingsLoader
}
}
private fun buildResources(value: Map<String, String>): List<ResourceBuilder> {
return try {
listOf(ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
config.digitalObjectFieldMappers,
value,
settings.appSettings.getProperty("institutionId"),
settings.appSettings.getProperty("recordSetId")
))
} catch (ex: InvalidMappingException) {
log.error(ex.localizedMessage)
emptyList()
}
}
private fun writeResource(value: ResourceBuilder): List<KeyValue<String, String>> {
return value.write().map { return@map KeyValue(it.first, it.second) }
}
}
\ No newline at end of file
......@@ -18,8 +18,16 @@
package org.memobase
import com.beust.klaxon.Klaxon
data class Report(
val id: String,
val status: String,
val message: String
)
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
......@@ -20,7 +20,10 @@ package org.memobase.builder
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.riot.RDFFormat
import org.memobase.mapping.exceptions.InvalidMappingException
import org.apache.kafka.streams.KeyValue
import org.memobase.Report
import org.memobase.helpers.ReportStatus
import org.memobase.mapping.KEYS
import org.memobase.mapping.fields.ConstantField
import org.memobase.mapping.fields.MappedAnnotationField
import org.memobase.mapping.fields.SimpleAnnotationField
......@@ -28,79 +31,154 @@ import org.memobase.mapping.mappers.IFieldMapper
import java.io.StringWriter
class ResourceBuilder(
recordIdField: String,
recordType: SimpleAnnotationField,
recordFieldMappers: List<IFieldMapper>,
physicalObjectFieldMappers: List<IFieldMapper>,
digitalObjectFieldMappers: List<IFieldMapper>,
source: Map<String, String>,
institutionId: String, recordSetId: String) {
private val resources: MutableList<RecordResource> = mutableListOf()
private val recordIdField: String,
private val recordType: SimpleAnnotationField,
private val recordFieldMappers: List<IFieldMapper>,
private val physicalObjectFieldMappers: List<IFieldMapper>,
private val digitalObjectFieldMappers: List<IFieldMapper>,
private val source: Map<String, String>,
private val institutionId: String,
private val recordSetId: String
) {
private var record: Record? = null
private var physicalObject: PhysicalObject? = null
private var digitalObject: DigitalObject? = null
private var count = 0
init {
val recordId = if (source.containsKey(recordIdField)) {
source[recordIdField] as String
private var recordId = ""
private var recordTypeValue = ""
private var errorMessage = ""
fun extractRecordId(): ResourceBuilder {
if (source.containsKey(recordIdField)) {
recordId = source[recordIdField] as String
} else {
throw InvalidMappingException("No id for record found in field '$recordIdField' for source $source.")
errorMessage = "No id for record found in field '$recordIdField' for source $source."
}
return this
}
fun hasRecordId(): Boolean {
return recordId.isNotEmpty()
}
val recordTypeValue = when (recordType) {
fun failureReport(key: String): Report {
return Report(key, ReportStatus.failure, errorMessage)
}
fun extractRecordTypeValue(): ResourceBuilder {
recordTypeValue = when (recordType) {
is MappedAnnotationField -> source[recordType.field].let {
it ?: throw InvalidMappingException("No type for record in field '${recordType.field} for source $source.")
if (it.isNullOrEmpty()) {
errorMessage = "No type for record in field '${recordType.field} for source $source."
""
} else {
it
}
}
is ConstantField -> recordType.constant
}
return this
}
fun hasRecordType(): Boolean {
return if (recordTypeValue.isEmpty()) false
else if (!KEYS.validRecordTypeValues.contains(recordTypeValue)) {
errorMessage = "Record type $recordTypeValue is invalid. Must be one of ${KEYS.validRecordTypeValues}."
false
} else {
true
}
}
val record =
Record(recordId, recordTypeValue, recordSetId, institutionId)
fun generateRecord(): ResourceBuilder {
record = Record(recordId, recordTypeValue, recordSetId, institutionId)
for (recordFieldMapper in recordFieldMappers) {
recordFieldMapper.apply(source, record)
recordFieldMapper.apply(source, record!!)
}
resources.add(record)
val physicalObject: PhysicalObject? =
if (physicalObjectFieldMappers.isNotEmpty()) {
val physicalObject = PhysicalObject(recordId, institutionId, count)
count += 1
physicalObjectFieldMappers.forEach {
it.apply(source, physicalObject)
return this
}
fun generatePhysicalObject(): ResourceBuilder {
physicalObject =
if (physicalObjectFieldMappers.isNotEmpty()) {
val physicalObject = PhysicalObject(recordId, institutionId, count)
count += 1
physicalObjectFieldMappers.forEach {
it.apply(source, physicalObject)
}
record?.addInstantiation(physicalObject)
physicalObject.addRecord(record!!)
physicalObject
} else {
null
}
record.addInstantiation(physicalObject)
physicalObject.addRecord(record)
resources.add(physicalObject)
physicalObject
} else {
null
}
val digitalObject: DigitalObject? =
if (digitalObjectFieldMappers.isNotEmpty()) {
val digitalObject = DigitalObject(recordId, institutionId, count)
count += 1
digitalObjectFieldMappers.forEach {
it.apply(source, digitalObject)
return this
}
fun generateDigitalObject(): ResourceBuilder {
digitalObject =
if (digitalObjectFieldMappers.isNotEmpty()) {
val digitalObject = DigitalObject(recordId, institutionId, count)
count += 1
digitalObjectFieldMappers.forEach {
it.apply(source, digitalObject)
}
digitalObject.addRecord(record!!)
record?.addInstantiation(digitalObject)
digitalObject
} else {
null
}
digitalObject.addRecord(record)
record.addInstantiation(digitalObject)
resources.add(digitalObject)
digitalObject
} else {
null
}
return this
}
fun addDerivedConnection(): ResourceBuilder {
if (physicalObject != null && digitalObject != null) {
physicalObject.addDerivedInstantiation(digitalObject)
digitalObject.addIsDerivedFromInstantiation(physicalObject)
physicalObject?.addDerivedInstantiation(digitalObject!!)
digitalObject?.addIsDerivedFromInstantiation(physicalObject!!)
}
return this
}
fun write(): List<Pair<String, String>> {
return resources.map { resource ->
StringWriter().use { writer ->
RDFDataMgr.write(writer, resource.model, RDFFormat.NTRIPLES_UTF8)
return@map Pair(resource.resource.uri, writer.toString().trim())
}
fun writeRecord(): Pair<KeyValue<String, String>, Report> {
return StringWriter().use { writer ->
RDFDataMgr.write(writer, record!!.model, RDFFormat.NTRIPLES_UTF8)
return@use Pair(
KeyValue(record!!.resource.uri, writer.toString().trim()),
Report(record!!.resource.uri, ReportStatus.success, "Successfully created a record from source.")
)
}
}
fun hasPhysicalObject(): Boolean = physicalObject != null
fun writePhysicalObject(): Pair<KeyValue<String, String>, Report> {
return StringWriter().use { writer ->
RDFDataMgr.write(writer, physicalObject!!.model, RDFFormat.NTRIPLES_UTF8)
return@use Pair(
KeyValue(physicalObject!!.resource.uri, writer.toString().trim()),
Report(
physicalObject!!.resource.uri,
ReportStatus.success,
"Successfully created a physical instantiation from source."
)
)
}
}
fun hasDigitalObject(): Boolean = digitalObject != null
fun writeDigitalObject(): Pair<KeyValue<String, String>, Report> {
return StringWriter().use { writer ->
RDFDataMgr.write(writer, digitalObject!!.model, RDFFormat.NTRIPLES_UTF8)
return@use Pair(
KeyValue(digitalObject!!.resource.uri, writer.toString().trim()),
Report(
digitalObject!!.resource.uri,
ReportStatus.success,
"Successfully created a digital instantiation from source."
)
)
}
}
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase.helpers
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
......@@ -26,6 +26,8 @@ import org.memobase.rdf.SCHEMA
object KEYS {
val validRecordTypeValues = listOf("Film", "Foto", "Radio", "Ton", "Tonbildschau", "TV", "Video")
const val uri = "uri"
const val type = "type"
......
......@@ -55,8 +55,6 @@ class MappingConfig(directory: String) {
val physicalObjectFieldMappers = mutableListOf<IFieldMapper>()
val digitalObjectFieldMappers = mutableListOf<IFieldMapper>()
private val recordTypes = listOf("Film", "Foto", "Radio", "Ton", "Tonbildschau", "TV", "Video")
private val log = LogManager.getLogger("MappingConfigParser")
init {
......@@ -90,7 +88,7 @@ class MappingConfig(directory: String) {
if (localRecordType != null) {
recordType = localRecordType as SimpleAnnotationField
} else {
log.error("Mapping is missing record.type field which is required. Assign one value out of $recordTypes or a field.")
log.error("Mapping is missing record.type field which is required.")
exitProcess(1)
}
}
......@@ -170,7 +168,7 @@ class MappingConfig(directory: String) {
private fun parseDigitalInstantiationConfig(source: Map<String, Any>) {
for (entry in source) {
when (val key = entry.key) {
KEYS.locator, KEYS.descriptiveNote, KEYS.duration ->
KEYS.locator, KEYS.descriptiveNote, KEYS.duration, KEYS.conditionsOfUse ->
digitalObjectFieldMappers.add(buildAnnotationMappers(entry))
KEYS.identifiers ->
digitalObjectFieldMappers.addAll(buildRicoConceptMappers(key, entry.value))
......
......@@ -21,5 +21,6 @@ package org.memobase
data class KafkaTestParams(
val count: Int,
val key: String,
val expectedKeys: List<String>
val expectedKeys: List<String>,
val reports: List<Report>
)
\ No newline at end of file
......@@ -17,6 +17,7 @@
*/
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
......@@ -44,6 +45,8 @@ class Tests {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
private fun reportingTopic(value: String) = "$value-reporting"
@Test
fun `test mapping config validation`() {
val config = MappingConfig(configTestBasePath + "minimalValid")
......@@ -54,35 +57,6 @@ class Tests {
}
private val configTestBasePath = "src/test/resources/configTests/"
/*
@Test
fun `json-ld data export for record sets`() {
val config =
MappingConfig("src/test/resources/multifileconfig")
var count = 0
Files.list(Paths.get("src/test/resources/data")).forEach { path ->
val text = Files.newBufferedReader(path).readText()
val values = Klaxon().parse<Map<String, String>>(text)
val resourceBuilder = ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
values!!,
"BAZ",
"BAZ-B_MEI"
)
val string = resourceBuilder.write()
string.forEach { s ->
count += 1
FileOutputStream("src/test/resources/output/data$count.json").bufferedWriter(Charset.defaultCharset()).use {
it.write(s.second)
}
}
}
}*/
private val regex = Regex("(_:B[A-Za-z0-9]+)")
......@@ -108,8 +82,6 @@ class Tests {
)
)
var record = testDriver.readOutput(
settings.outputTopic,
StringDeserializer(),
......@@ -133,6 +105,15 @@ class Tests {
assertThat(sortedResult)
.isEqualTo(readFile("kafkaTests/${params.count}/output$recordCount.nt"))
val recordReport =
testDriver.readOutput(
reportingTopic(settings.outputTopic),
StringDeserializer(),
StringDeserializer()
)
assertThat(Klaxon().parse<Report>(recordReport.value()))
.isEqualTo(params.reports[recordCount - 1])
record = testDriver.readOutput(
settings.outputTopic,
StringDeserializer(),
......@@ -146,15 +127,19 @@ class Tests {
KafkaTestParams(
1,
"MEI_49884",
listOf("https://memobase.ch/record/BAZ-MEI_49884",
"https://memobase.ch/instantiation/physical/BAZ-MEI_49884-0")
listOf("MEI_49884"),
listOf(Report("MEI_49884", "FAILURE", "No type for record in field 'Foto for source {original_id=MEI_49884}."))
),
KafkaTestParams(
2,
"Sig Han 1293",
listOf(
"https://memobase.ch/record/TEST-Sig_Han_1293",
"https://memobase.ch/instantiation/digital/TEST-Sig_Han_1293-0")
"https://memobase.ch/instantiation/digital/TEST-Sig_Han_1293-0"),
listOf(
Report("https://memobase.ch/record/TEST-Sig_Han_1293", "SUCCESS", "Successfully created a record from source."),
Report("https://memobase.ch/instantiation/digital/TEST-Sig_Han_1293-0", "SUCCESS", "Successfully created a digital instantiation from source.")
)
),
KafkaTestParams(
3,
......@@ -163,6 +148,11 @@ class Tests {
"https://memobase.ch/record/TEST-Sig_Han_1293",
"https://memobase.ch/instantiation/physical/TEST-Sig_Han_1293-0",
"https://memobase.ch/instantiation/digital/TEST-Sig_Han_1293-1"
),
listOf(
Report("https://memobase.ch/record/TEST-Sig_Han_1293", "SUCCESS", "Successfully created a record from source."),
Report("https://memobase.ch/instantiation/physical/TEST-Sig_Han_1293-0", "SUCCESS", "Successfully created a physical instantiation from source."),
Report("https://memobase.ch/instantiation/digital/TEST-Sig_Han_1293-1", "SUCCESS", "Successfully created a digital instantiation from source.")
)
),
KafkaTestParams(
......@@ -171,6 +161,10 @@ class Tests {
listOf(
"https://memobase.ch/record/TEST-sigantur-example",
"https://memobase.ch/instantiation/physical/TEST-sigantur-example-0"
),
listOf(
Report("https://memobase.ch/record/TEST-sigantur-example", "SUCCESS", "Successfully created a record from source."),
Report("https://memobase.ch/instantiation/physical/TEST-sigantur-example-0", "SUCCESS", "Successfully created a physical instantiation from source.")
)
)
)
......
{
"titel": "«Villa Siegel», Zürich",
"beschreibung": "Villa mit Garten und Brunnen im Vordergrund. Vermutlich von Architekt Walz",
"genre": "Bauwerk",
"aufnahmeort": "Zürich",
"autorin": "Atelier Meiner",
"auftraggeber": "Walz",
"verwandte_dokumente": "Auftragsregister Bd. 6; Bildverzeichnis Bd. 7",
"erstellung": "19210914",
"rechteinhaber": "BAZ ",
"nutzungsrecht": "nach Absprache",
"medium": "Negativ Nitrat (NN)",
"format": "18x24",
"farbe": "sw",