Commit d01f53b0 authored by Jonas Waeber's avatar Jonas Waeber

Update service for dependency.

Add proper reporting for local transform validation.
parent 00baf0e2
Pipeline #21468 passed with stages
in 4 minutes and 16 seconds
......@@ -36,7 +36,7 @@ dependencies {
implementation 'ch.memobase:memobase-kafka-utils:0.1.2'
implementation 'org.memobase:memobase-service-utilities:2.0.4'
implementation 'ch.memobase:normalization-service-configuration:0.2.3'
implementation 'ch.memobase:normalization-service-configuration:0.3.1'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
......
......@@ -20,6 +20,7 @@ package org.memobase
import ch.memobase.configs.GlobalTransformsLoader
import ch.memobase.configs.LocalTransformsLoader
import ch.memobase.helpers.KEYS
import ch.memobase.kafka.utils.ConfigJoiner
import ch.memobase.kafka.utils.models.ImportService
import ch.memobase.kafka.utils.models.JoinedValues
......@@ -28,7 +29,6 @@ import ch.memobase.rdf.MemobaseModel
import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.HeaderExtractionTransformSupplier
import ch.memobase.settings.HeaderMetadata
import ch.memobase.settings.SettingsLoader
import java.io.ByteArrayInputStream
import java.io.StringWriter
......@@ -49,12 +49,12 @@ class KafkaTopology(
private val reportTopic = settings.processReportTopic
private val globalTransforms = GlobalTransformsLoader(settings.appSettings.getProperty("transformMapping")).get()
private val configJoiner = ConfigJoiner<String, LocalTransformsLoader>(
private val configJoiner = ConfigJoiner<String, ByteArray>(
ImportService.LocalTransform,
Serdes.String(),
Serdes.serdeFrom(
{ _, data -> data.getByteStream() },
{ _, data -> LocalTransformsLoader(data) }
{ _, data -> data },
{ _, data -> data }
),
this::parseConfig
)
......@@ -71,21 +71,19 @@ class KafkaTopology(
configJoiner.join(stream, configStream)
val filterInvalidLocalTransforms = joinedStream
.transformValues(HeaderExtractionTransformSupplier<JoinedValues<String, LocalTransformsLoader>>())
.branch(
Predicate { _, value -> value.first.right.errorMessage != "" },
.transformValues(HeaderExtractionTransformSupplier<JoinedValues<String, ByteArray>>())
.mapValues { value -> Triple(value.first.left, LocalTransformsLoader(value.first.right), value.second) }
.mapValues { readOnlyKey, value ->
val report = value.second.parse(readOnlyKey)
PipelineWrapper(value.first, value.second.get(), report, value.third)
}.branch(
Predicate { _, value -> value.localTransformsReport.status == ReportStatus.fatal },
Predicate { _, _ -> true }
)
filterInvalidLocalTransforms[0]
.mapValues { readOnlyKey, value ->
Report(
readOnlyKey,
ReportStatus.fatal,
value.first.right.errorMessage,
Service.step
).toJson()
.mapValues { value ->
value.localTransformsReport.toJson()
}
.to(reportTopic)
......@@ -102,7 +100,7 @@ class KafkaTopology(
key,
ReportStatus.fatal,
"Could not parse message. Found invalid input data (RiotException). Check logs.",
Service.step
KEYS.step
).toJson()
}
.to(reportTopic)
......@@ -123,18 +121,18 @@ class KafkaTopology(
return builder
}
private fun createModel(input: Pair<JoinedValues<String, LocalTransformsLoader>, HeaderMetadata>): PreparedInput? {
private fun createModel(input: PipelineWrapper): PipelineWrapper2? {
val model = MemobaseModel()
try {
RDFDataMgr.read(model, ByteArrayInputStream(input.first.left.toByteArray()), Lang.NTRIPLES)
RDFDataMgr.read(model, ByteArrayInputStream(input.data.toByteArray()), Lang.NTRIPLES)
} catch (ex: RiotException) {
return null
}
return PreparedInput(model, input.first.right, input.second)
return PipelineWrapper2(model, input.localTransforms, input.localTransformsReport, input.headerMetadata)
}
private fun transformations(key: String, input: PreparedInput): Pair<Report, Model> {
val transformConfigs = input.localTransforms.get() + globalTransforms
private fun transformations(key: String, input: PipelineWrapper2): Pair<Report, Model> {
val transformConfigs = input.localTransforms + globalTransforms
val results = mutableListOf<String>()
for (transformConfig in transformConfigs) {
val listOfResources = input.model.listRicoResourceSubjects().toList()
......@@ -147,21 +145,34 @@ class KafkaTopology(
Report(
key,
ReportStatus.fatal,
"InvalidInput: " + ex.localizedMessage,
Service.step
"InvalidInput: ${ex.localizedMessage}\n${input.localTransformsReport.message}",
KEYS.step
),
input.model
)
} catch (ex: Exception) {
log.error(ex.localizedMessage)
Pair(
Report(
key,
ReportStatus.fatal,
"${ex.javaClass.name}: ${ex.localizedMessage}\n${input.localTransformsReport.message}",
KEYS.step
),
input.model
)
}
}
}
var reportMessage = if (results.isEmpty()) "" else results.reduce { acc, s -> acc + "\n" + s }
reportMessage += input.localTransformsReport.message
reportMessage = reportMessage.trim()
return Pair(
Report(
key,
if (results.isEmpty()) ReportStatus.success else ReportStatus.warning,
if (results.isEmpty()) "Normalization was successful." else results.reduce { acc, s -> acc + "\n" + s }
.trim(),
Service.step
reportMessage,
KEYS.step
),
input.model
)
......@@ -173,7 +184,7 @@ class KafkaTopology(
return out.toString().trim()
}
private fun parseConfig(data: ByteArray): LocalTransformsLoader {
return LocalTransformsLoader(data)
private fun parseConfig(data: ByteArray): ByteArray {
return data
}
}
package org.memobase
import ch.memobase.reporting.Report
import ch.memobase.settings.HeaderMetadata
import ch.memobase.transform.ITransformer
data class PipelineWrapper(
val data: String,
val localTransforms: List<ITransformer>,
val localTransformsReport: Report,
val headerMetadata: HeaderMetadata
)
package org.memobase
import ch.memobase.configs.LocalTransformsLoader
import ch.memobase.rdf.MemobaseModel
import ch.memobase.reporting.Report
import ch.memobase.settings.HeaderMetadata
import ch.memobase.transform.ITransformer
data class PreparedInput(
data class PipelineWrapper2(
val model: MemobaseModel,
val localTransforms: LocalTransformsLoader,
val localTransforms: List<ITransformer>,
val localTransformsReport: Report,
val metadata: HeaderMetadata
)
......@@ -21,9 +21,6 @@ import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
class Service(file: String = "app.yml") {
companion object {
const val step = "normalization-service"
}
private val log = LogManager.getLogger("NormalizationService")
val settings = SettingsLoader(
......
......@@ -17,6 +17,7 @@
*/
package org.memobase
import ch.memobase.helpers.KEYS
import ch.memobase.rdf.MemobaseModel
import ch.memobase.rdf.NS
import ch.memobase.reporting.Report
......@@ -189,7 +190,7 @@ class TestIntegration {
KafkaTestParams(
5,
"placeholder",
Report("placeholder", "SUCCESS", "Normalization was successful.", Service.step)
Report("placeholder", "SUCCESS", "", KEYS.step)
)/*,
KafkaTestParams(
6,
......
......@@ -22,6 +22,22 @@
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix dc: <http://purl.org/dc/elements/1.1/> .
_:b0 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Filmwochenschau"@de , "Filmwochenschau"@fr , "Filmwochenschau"@it ;
rico:resultsFrom _:b1 .
_:b2 a rico:Activity ;
rico:affects _:b3 ;
rico:beginningDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:endDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "LanguagesNormalizer" ;
rico:performs _:b2
] ;
rico:resultsIn _:b4 ;
rico:type "enrichment" .
mbr:sfw-SFW_1376-2 a rico:Record ;
dcterms:abstract "Abstract: <br> Schweiz, Seedorf, UR: Moderner Landwirtschaftsbetrieb der Nonnen des Benediktinerinnen-Klosters <br> <br> Sequenzbeschrieb: <br> Seedorf, UR (Schweiz) – Schwenk AA Kloster <br> Seedorf, UR (Schweiz) – Benediktinerinnen bei Gartenarbeit <br> Seedorf, UR (Schweiz) – Nonne bei Feldarbeit auf dem Traktor, Ordensschwestern Heu wendend <br> Seedorf, UR (Schweiz) – Aufladen von Heu mit Ladewagen <br> Seedorf, UR (Schweiz) – Einfahrt Traktor mit vollem Heuwagen in Scheune <br> <br> Communiqué: <br> In Seedorf, Kanton Uri, gehören seit vielen hundert Jahren 6 Stunden Feldarbeit zum Tagesprogramm des dortigen Benediktinerinnen-Klosters. Heute verfügt es auch über die modernsten Erntemaschinen. <br> <br> Begleittext: <br> Die frommen Klosterfrauen von Seedorf im Reusstal sind bekannt dafür, dass sie mindestens 6 Stunden ihres Tagesprogramms der Garten-und Feldarbeit widmen. / Modernste Landwirtschaftsmaschinen erlauben den Benediktinerinnen sowohl sich selbst als auch eine dem Kloster angeschlossene Haushaltungsschule mit 30 jungen Töchtern reichhaltig zu ernähren. / Zur Zeit der Kreuzzüge stand hier ein Siechenhaus, das dem heiligen Lazarus geweiht war und schon damals viele hilfreiche weibliche Geister aus der ganzen Innerschweiz anzog. / Im 16. Jahrhundert übergab Papst Paul IV. das vom Kanton renovierte Kloster den Benediktinerinnen, die es seither geradezu mustergültig betreut haben. / Wenn der viele Regen das Gras nicht trocknen lässt, dann betreiben die fleissigen Nonnen das Emden in zwei Gruppen. Während die eine das Gras schneidet, fährt die andere mit der Maschine direkt hintendrein und lädt alles auf. / Das nasse Gras wird dann ins Kloster gefahren und dort in einer Spezialanlage getrocknet. Bei den Seedorfer Klosterfrauen ist es Tradition, weitherum im Lande als besonders aufgeschlossen und fortschrittlich zu gelten. <br> <br> <![CDATA[<a href=\"https://media.zem.ch/01WS/1969/Communique_1376.pdf\" target=\"_blank\">Communiqué_1376.pdf</a>]]>"@de ;
dcterms:issued [ a rico:DateRange ;
......@@ -38,13 +54,13 @@ mbr:sfw-SFW_1376-2 a rico:Record ;
rico:name "Seedorf, UR (Schweiz)"@de
] ;
schema:sameAs "https://www.recherche.bar.admin.ch/recherche/#/de/suche/archivplan/21689176" ;
ebucore:hasGenre _:b0 , _:b1 , _:b2 , _:b3 , _:b4 , _:b5 ;
ebucore:hasGenre _:b5 , _:b0 , _:b6 , _:b7 , _:b8 , _:b9 ;
rico:descriptiveNote "Quellenangabe für Zitate und Publikationen: <br> Schweizerisches Bundesarchiv, J2.143#1996/386#1376-1#2*, Schweizer Filmwochenschau vom 19.09.1969, Klosterfrauen treiben Agrarkultur (1376-2)"@de ;
rico:hasInstantiation <https://memobase.ch/instantiation/digital/sfw-SFW_1376-2-1> , <https://memobase.ch/instantiation/physical/sfw-SFW_1376-2-0> ;
rico:hasLanguage _:b6 , _:b7 ;
rico:hasLanguage _:b3 , _:b4 ;
rico:hasSubject [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Deskriptoren" ;
skos:prefLabel "CHRISTLICHE KIRCHE"@de
skos:prefLabel "LANDWIRTSCHAFT"@de
] ;
rico:hasSubject [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Deskriptoren" ;
......@@ -52,43 +68,43 @@ mbr:sfw-SFW_1376-2 a rico:Record ;
] ;
rico:hasSubject [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Deskriptoren" ;
skos:prefLabel "LANDWIRTSCHAFT"@de
skos:prefLabel "RELIGION"@de
] ;
rico:hasSubject [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Deskriptoren" ;
skos:prefLabel "ORDENSGEMEINSCHAFT"@de
skos:prefLabel "CHRISTLICHE KIRCHE"@de
] ;
rico:hasSubject [ a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Deskriptoren" ;
skos:prefLabel "RELIGION"@de
skos:prefLabel "ORDENSGEMEINSCHAFT"@de
] ;
rico:hasTitle [ a rico:Title ;
rico:title "Schweizer Filmwochenschau"@de ;
rico:type "series"
rico:title "Schweizer Filmwochenschau vom 19.09.1969"@de ;
rico:type "broadcast"
] ;
rico:hasTitle [ a rico:Title ;
rico:title "Klosterfrauen treiben Agrarkultur (1376-2)"@de ;
rico:type "main"
] ;
rico:hasTitle [ a rico:Title ;
rico:title "Schweizer Filmwochenschau vom 19.09.1969"@de ;
rico:type "broadcast"
rico:title "Schweizer Filmwochenschau"@de ;
rico:type "series"
] ;
rico:heldBy mbcb:sfw ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "SFW_1376-2" ;
rico:type "original"
] ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "https://memobase.ch/record/sfw-SFW_1376-2" ;
rico:type "main"
] ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "SFW_1376-2" ;
rico:type "original"
] ;
rico:isPartOf mbrs:fbfws ;
rico:publishedBy [ a rico:CorporateBody ;
rico:name "SFW"@de
] ;
rico:recordResourceOrInstantiationIsSourceOfCreationRelation
_:b8 ;
_:b10 ;
rico:regulatedBy [ a rico:Rule ;
rico:name "Cinémathèque suisse, Schweizerisches Bundesarchiv" ;
rico:regulates mbr:sfw-SFW_1376-2 ;
......@@ -97,42 +113,29 @@ mbr:sfw-SFW_1376-2 a rico:Record ;
rico:title "Klosterfrauen treiben Agrarkultur (1376-2)"@de ;
rico:type "Film" .
_:b9 a rico:Activity ;
rico:affects _:b0 ;
rico:beginningDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:endDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "GenreNormalizer" ;
rico:performs _:b9
] ;
rico:resultsIn _:b5 ;
rico:type "enrichment" .
_:b10 a rico:Activity ;
rico:affects _:b1 ;
rico:beginningDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:endDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
_:b11 a rico:Activity ;
rico:affects _:b5 ;
rico:beginningDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:endDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "GenreNormalizer" ;
rico:performs _:b10
rico:performs _:b11
] ;
rico:resultsIn _:b3 ;
rico:resultsIn _:b8 ;
rico:type "enrichment" .
_:b4 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Filmwochenschau"@de , "Filmwochenschau"@fr , "Filmwochenschau"@it ;
rico:resultsFrom _:b11 .
_:b12 a rico:CarrierType ;
schema:sameAs "http://www.wikidata.org/entity/Q226528" ;
rico:name "35-mm-Film"@de , "format 35 mm"@fr , "35 millimetri"@it ;
rico:resultsFrom _:b13 .
_:b7 a rico:Language ;
schema:sameAs "http://www.wikidata.org/entity/Q188" ;
rico:name "Deutsch"@de , "allemand"@fr , "tedesco"@it ;
rico:resultsFrom _:b12 ;
rico:type "content" .
_:b3 a rico:Language ;
rico:name "dt"@de ;
rico:type "content" .
_:b1 a skos:Concept ;
_:b7 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Ballett"@de .
skos:prefLabel "Filmwochenschau"@de .
<https://memobase.ch/instantiation/digital/sfw-SFW_1376-2-1>
a rico:Instantiation ;
......@@ -157,26 +160,11 @@ _:b1 a skos:Concept ;
] ;
rico:type "digitalObject" .
_:b6 a rico:Language ;
rico:name "dt"@de ;
rico:type "content" .
_:b13 a rico:Activity ;
rico:affects _:b14 ;
rico:beginningDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:endDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "CarrierTypeNormalizer" ;
rico:performs _:b13
] ;
rico:resultsIn _:b15 ;
rico:type "enrichment" .
<https://memobase.ch/instantiation/physical/sfw-SFW_1376-2-0>
a rico:Instantiation ;
rdau:P60558 "SW"@de ;
ebucore:duration "78" ;
rico:hasCarrierType _:b15 , _:b14 ;
rico:hasCarrierType _:b14 , _:b12 ;
rico:hasDerivedInstantiation <https://memobase.ch/instantiation/digital/sfw-SFW_1376-2-1> ;
rico:identifiedBy [ a rico:Identifier ;
rico:identifier "https://memobase.ch/instantiation/physical/sfw-SFW_1376-2-0" ;
......@@ -189,60 +177,72 @@ _:b13 a rico:Activity ;
rico:instantiates mbr:sfw-SFW_1376-2 ;
rico:type "physicalObject" .
_:b2 a skos:Concept ;
_:b9 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Filmwochenschau"@de .
skos:prefLabel "Musik"@de , "Musique"@fr , "Musica"@it ;
rico:resultsFrom _:b15 .
_:b0 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Canzone"@de .
_:b15 a rico:Activity ;
rico:affects _:b6 ;
rico:beginningDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:endDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "GenreNormalizer" ;
rico:performs _:b15
] ;
rico:resultsIn _:b9 ;
rico:type "enrichment" .
_:b15 a rico:CarrierType ;
schema:sameAs "http://www.wikidata.org/entity/Q226528" ;
rico:name "35-mm-Film"@de , "format 35 mm"@fr , "35 millimetri"@it ;
rico:resultsFrom _:b13 .
_:b4 a rico:Language ;
schema:sameAs "http://www.wikidata.org/entity/Q188" ;
rico:name "Deutsch"@de , "allemand"@fr , "tedesco"@it ;
rico:resultsFrom _:b2 ;
rico:type "content" .
_:b3 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Performance"@de , "Performance"@fr , "Prestazioni"@it ;
rico:resultsFrom _:b10 .
_:b10 a rico:CreationRelation ;
rico:creationRelationHasSource mbr:sfw-SFW_1376-2 ;
rico:creationRelationHasTarget [ a rico:CorporateBody ;
rico:agentIsTargetOfCreationRelation
_:b10 ;
rico:name "Schweizer Filmwochenschau (SFW)"@de
] ;
rico:name "Autor" ;
rico:type "creator" .
_:b14 a rico:CarrierType ;
rico:name "35mm Zelluloseazetat"@de .
_:b11 a rico:Activity ;
rico:affects _:b2 ;
rico:beginningDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:endDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
_:b1 a rico:Activity ;
rico:affects _:b7 ;
rico:beginningDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:endDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "GenreNormalizer" ;
rico:performs _:b11
rico:performs _:b1
] ;
rico:resultsIn _:b4 ;
rico:resultsIn _:b0 ;
rico:type "enrichment" .
_:b12 a rico:Activity ;
rico:affects _:b6 ;
rico:beginningDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
rico:endDate "2021-42-22T08:42:51+0000"^^xsd:dateTime ;
_:b5 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Ballett"@de .
_:b6 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Canzone"@de .
_:b13 a rico:Activity ;
rico:affects _:b14 ;
rico:beginningDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:endDate "2021-57-09T12:57:13+0000"^^xsd:dateTime ;
rico:performedBy [ a rico:Mechanism ;
rico:name "LanguagesNormalizer" ;
rico:performs _:b12
rico:name "CarrierTypeNormalizer" ;
rico:performs _:b13
] ;
rico:resultsIn _:b7 ;
rico:resultsIn _:b12 ;
rico:type "enrichment" .
_:b5 a skos:Concept ;
_:b8 a skos:Concept ;
skos:editorialNote "Ursprungsfeld: Genre" ;
skos:prefLabel "Musik"@de , "Musique"@fr , "Musica"@it ;
rico:resultsFrom _:b9 .
_:b8 a rico:CreationRelation ;
rico:creationRelationHasSource mbr:sfw-SFW_1376-2 ;
rico:creationRelationHasTarget [ a rico:CorporateBody ;
rico:agentIsTargetOfCreationRelation
_:b8 ;
rico:name "Schweizer Filmwochenschau (SFW)"@de
] ;
rico:name "Autor" ;
rico:type "creator" .
skos:prefLabel "Performance"@de , "Performance"@fr , "Prestazioni"@it ;
rico:resultsFrom _:b11 .
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment