Commit 6f2ce3b9 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Handle parse errors.

parent b01351a4
Pipeline #16176 passed with stages
in 6 minutes and 54 seconds
......@@ -34,7 +34,7 @@ ext {
dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:0.16.0'
implementation 'ch.memobase:mapper-service-configuration:0.3.3'
implementation 'ch.memobase:mapper-service-configuration:0.3.4'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
......
......@@ -21,7 +21,9 @@ package org.memobase
import ch.memobase.builder.ResourceBuilder
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
import ch.memobase.mapping.MappingConfigurationParser
import ch.memobase.mapping.exceptions.InvalidMappingException
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import mapping.MapperConfiguration
......@@ -41,6 +43,7 @@ class KafkaTopology(
) {
private val log = LogManager.getLogger("KafkaTopology")
private val reportTopic = settings.processReportTopic
private val klaxon = Klaxon()
private val configJoiner = ConfigJoiner<String, ByteArray>(
ImportService.Mapping,
......@@ -67,11 +70,25 @@ class KafkaTopology(
val joinedStream =
configJoiner.join(stream, configStream)
val jsonStream = joinedStream
.mapValues { value -> Pair(value.left, MappingConfigurationParser(value.right).get()) }
.flatMapValues { value -> parseJsonFile(value) }
val parsedStream = joinedStream
.mapValues { readOnlyKey, value -> parse(readOnlyKey, value) }
.branch(
Predicate { _, value -> value.third != null },
Predicate { _, _ -> true }
)
parsedStream[0]
.mapValues { key, value ->
value.third?.toJson() ?: Report(
key,
ReportStatus.failure,
"Caught an error, but not report was created."
)
}
.to(reportTopic)
val extractedRecordIdStream = jsonStream
val extractedRecordIdStream = parsedStream[1]
.mapValues { value -> Pair(value.first!!, value.second!!) }
.transformValues(HeaderExtractionTransformSupplier<Pair<Map<String, Any>, MapperConfiguration>>())
.mapValues { value -> buildResources(value) }
.mapValues { value -> value.extractRecordId() }
......@@ -151,23 +168,6 @@ class KafkaTopology(
return hasRecordTypeValue[0]
}
private fun parseJsonFile(input: Pair<String, MapperConfiguration>):
List<Pair<Map<String, String>, MapperConfiguration>> {
return try {
Klaxon().parse<Map<String, String>>(input.first).let {
if (it != null) {
listOf(Pair(it, input.second))
} else {
// TODO: REPORT EMPTY JSON
emptyList()
}
}
} catch (ex: KlaxonException) {
// TODO: REPORT JSON PARSE EXCEPTIONS
emptyList()
}
}
private fun writeRecord(builder: ResourceBuilder): KeyValue<String, Pair<String, Report>> {
val result = builder.writeRecord()
return KeyValue(
......@@ -182,4 +182,27 @@ class KafkaTopology(
private fun parseConfig(data: ByteArray): ByteArray {
return data
}
private fun parse(
key: String,
value: JoinedValues<String, ByteArray>
): Triple<Map<String, Any>?, MapperConfiguration?, Report?> {
return try {
val mapperConfiguration = MappingConfigurationParser(value.right)
val parsedSource = klaxon.parse<Map<String, Any>>(value.left)
if (parsedSource != null) {
log.info("Successfully parsed source & mapping configuration.")
Triple(parsedSource, mapperConfiguration.get(), null)
} else {
log.error("Parsed source is empty: ${value.left}.")
Triple(null, null, Report(key, ReportStatus.failure, "Found empty source document."))
}
} catch (ex: InvalidMappingException) {
log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
} catch (ex: KlaxonException) {
log.error(ex.localizedMessage)
Triple(null, null, Report(key, ReportStatus.failure, ex.localizedMessage))
}
}
}
\ No newline at end of file
@prefix schema: <http://schema.org/> .
@prefix internal: <http://memobase.ch/internal/> .
@prefix mbrs: <https://memobase.ch/recordSet/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix wdt: <http://www.wikidata.org/prop/direct/> .
@prefix mbpo: <https://memobase.ch/physical/> .
@prefix mbcb: <https://memobase.ch/institution/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix wd: <http://www.wikidata.org/entity/> .
@prefix wdtn: <http://www.wikidata.org/prop/direct-normalized/> .
@prefix mbdo: <https://memobase.ch/digital/> .
@prefix rdau: <http://rdaregistry.info/Elements/u/> .
@prefix fedora: <http://fedora.info/definitions/v4/repository#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rico: <https://www.ica.org/standards/RiC/ontology#> .
@prefix ebucore: <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#> .
@prefix ldp: <http://www.w3.org/ns/ldp#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix mbr: <https://memobase.ch/record/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix dc: <http://purl.org/dc/elements/1.1/> .
mbr:test-institution-id-Sig_Han_1293
a rico:Record ;
internal:isPublished false ;
rico:hasInstantiation mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:heldBy mbcb:test-record-set-id ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-Sig_Han_1293" ;
rico:type "main"
] ;
rico:isPartOf mbrs:test-institution-id ;
rico:type "Video" .
mbdo:test-institution-id-Sig_Han_1293-1
a rico:Instantiation ;
ebucore:duration "10:10:10" ;
ebucore:locator "http://exampl.org/video/play" ;
rico:descriptiveNote "Eine Bespielbeschreibung"@de ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-Sig_Han_1293-1" ;
rico:type "main"
] ;
rico:instantiates mbr:test-institution-id-Sig_Han_1293 ;
rico:regulatedBy [ a rico:Rule ;
rico:name "public" ;
rico:regulates mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:type "access"
] ;
rico:regulatedBy [ a rico:Rule ;
schema:sameAs "http://rightsstatements.org/vocab/InC/1.0/" ;
rico:name "In Copyright (InC)" ;
rico:regulates mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:type "usage"
] ;
rico:regulatedBy [ a rico:Rule ;
rico:name "Familie XYZ" ;
rico:regulates mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:type "holder"
] ;
rico:type "digitalObject" .
@prefix schema: <http://schema.org/> .
@prefix internal: <http://memobase.ch/internal/> .
@prefix mbrs: <https://memobase.ch/recordSet/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix wdt: <http://www.wikidata.org/prop/direct/> .
@prefix mbpo: <https://memobase.ch/physical/> .
@prefix mbcb: <https://memobase.ch/institution/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix wd: <http://www.wikidata.org/entity/> .
@prefix wdtn: <http://www.wikidata.org/prop/direct-normalized/> .
@prefix mbdo: <https://memobase.ch/digital/> .
@prefix rdau: <http://rdaregistry.info/Elements/u/> .
@prefix fedora: <http://fedora.info/definitions/v4/repository#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rico: <https://www.ica.org/standards/RiC/ontology#> .
@prefix ebucore: <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#> .
@prefix ldp: <http://www.w3.org/ns/ldp#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix mbr: <https://memobase.ch/record/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix dc: <http://purl.org/dc/elements/1.1/> .
mbpo:test-institution-id-Sig_Han_1293-1
a rico:Instantiation ;
rdau:P60558 "farbe" ;
ebucore:duration "10:10:10" ;
ebucore:hasMedium "format"@de ;
rico:hasDerivedInstantiation mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-Sig_Han_1293-1" ;
rico:type "main"
] ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "Sig Han 1293" ;
rico:type "callNumber"
] ;
rico:instantiates mbr:test-institution-id-Sig_Han_1293 ;
rico:physicalCharacteristics "codec"@de , "bildformat"@de ;
rico:regulatedBy [ a rico:Rule ;
rico:name "public" ;
rico:regulates mbpo:test-institution-id-Sig_Han_1293-1 ;
rico:type "access"
] ;
rico:regulatedBy [ a rico:Rule ;
schema:sameAs "http://rightsstatements.org/vocab/InC/1.0/" ;
rico:name "In Copyright (InC)" ;
rico:regulates mbpo:test-institution-id-Sig_Han_1293-1 ;
rico:type "usage"
] ;
rico:type "physicalObject" .
mbr:test-institution-id-Sig_Han_1293
a rico:Record ;
internal:isPublished false ;
schema:sameAs "http://example.org/stuff" ;
rico:descriptiveNote "Eine Bespielbeschreibung"@de ;
rico:hasInstantiation mbpo:test-institution-id-Sig_Han_1293-1 , mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:heldBy mbcb:test-record-set-id ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-Sig_Han_1293" ;
rico:type "main"
] ;
rico:isPartOf mbrs:test-institution-id ;
rico:scopeAndContent "Kontext text"@de ;
rico:title "Das ist ein Titel"@de ;
rico:type "Tonbildschau" .
mbdo:test-institution-id-Sig_Han_1293-1
a rico:Instantiation ;
ebucore:locator "http://exampl.org/video/play" ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-Sig_Han_1293-1" ;
rico:type "main"
] ;
rico:instantiates mbr:test-institution-id-Sig_Han_1293 ;
rico:isDerivedFromInstantiation
mbpo:test-institution-id-Sig_Han_1293-1 ;
rico:regulatedBy [ a rico:Rule ;
rico:name "Familie XYZ" ;
rico:regulates mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:type "holder"
] ;
rico:regulatedBy [ a rico:Rule ;
schema:sameAs "http://rightsstatements.org/vocab/InC/1.0/" ;
rico:name "In Copyright (InC)" ;
rico:regulates mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:type "usage"
] ;
rico:regulatedBy [ a rico:Rule ;
rico:name "public" ;
rico:regulates mbdo:test-institution-id-Sig_Han_1293-1 ;
rico:type "access"
] ;
rico:type "digitalObject" .
@prefix schema: <http://schema.org/> .
@prefix internal: <http://memobase.ch/internal/> .
@prefix mbrs: <https://memobase.ch/recordSet/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix wdt: <http://www.wikidata.org/prop/direct/> .
@prefix mbpo: <https://memobase.ch/physical/> .
@prefix mbcb: <https://memobase.ch/institution/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix wd: <http://www.wikidata.org/entity/> .
@prefix wdtn: <http://www.wikidata.org/prop/direct-normalized/> .
@prefix mbdo: <https://memobase.ch/digital/> .
@prefix rdau: <http://rdaregistry.info/Elements/u/> .
@prefix fedora: <http://fedora.info/definitions/v4/repository#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rico: <https://www.ica.org/standards/RiC/ontology#> .
@prefix ebucore: <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#> .
@prefix ldp: <http://www.w3.org/ns/ldp#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix mbr: <https://memobase.ch/record/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix dc: <http://purl.org/dc/elements/1.1/> .
mbpo:test-institution-id-sigantur-example-1
a rico:Instantiation ;
rico:conditionsOfUse "Blablabla" ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-sigantur-example-1" ;
rico:type "main"
] ;
rico:instantiates mbr:test-institution-id-sigantur-example ;
rico:physicalCharacteristics "Seitenverhältnis: seitenverhältnis_de"@de , "Procede Son: procede_son"@fr , "Rapporto d'aspetto: seitenverhältnis_it"@it , "ID Film: id_film"@fr , "Métrage: metrage"@fr ;
rico:type "physicalObject" .
mbr:test-institution-id-sigantur-example
a rico:Record ;
internal:isPublished false ;
rico:hasInstantiation mbpo:test-institution-id-sigantur-example-1 ;
rico:heldBy mbcb:test-record-set-id ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-institution-id-sigantur-example" ;
rico:type "main"
] ;
rico:isPartOf mbrs:test-institution-id ;
rico:type "Foto" .
@prefix schema: <http://schema.org/> .
@prefix internal: <http://memobase.ch/internal/> .
@prefix mbrs: <https://memobase.ch/recordSet/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix wdt: <http://www.wikidata.org/prop/direct/> .
@prefix mbpo: <https://memobase.ch/physical/> .
@prefix mbcb: <https://memobase.ch/institution/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix wd: <http://www.wikidata.org/entity/> .
@prefix wdtn: <http://www.wikidata.org/prop/direct-normalized/> .
@prefix mbdo: <https://memobase.ch/digital/> .
@prefix rdau: <http://rdaregistry.info/Elements/u/> .
@prefix fedora: <http://fedora.info/definitions/v4/repository#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rico: <https://www.ica.org/standards/RiC/ontology#> .
@prefix ebucore: <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#> .
@prefix ldp: <http://www.w3.org/ns/ldp#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix mbr: <https://memobase.ch/record/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix dc: <http://purl.org/dc/elements/1.1/> .
mbr:test-record-set-id-ID_1
a rico:Record ;
internal:isPublished false ;
ebucore:hasGenre [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Genre 1, Genre 2"@fr
] ;
rico:hasSubject [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Mots clés" ;
skos:prefLabel "Schlagwort 1, Schlagwort 2"@fr
] ;
rico:heldBy mbcb:test-institution-id ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "test-record-set-id-ID_1" ;
rico:type "main"
] ;
rico:isPartOf mbrs:test-record-set-id ;
rico:type "Foto" .
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment