Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
memoriav
Memobase 2020
services
Import Process
Mapper Service
Commits
bcd52365
Commit
bcd52365
authored
Dec 07, 2020
by
Jonas Waeber
Browse files
Update dependencies
parent
b2786972
Pipeline
#18481
passed with stages
in 3 minutes and 46 seconds
Changes
9
Pipelines
2
Show whitespace changes
Inline
Side-by-side
build.gradle
View file @
bcd52365
...
...
@@ -32,8 +32,8 @@ ext {
dependencies
{
implementation
'ch.memobase:memobase-kafka-utils:0.2.3'
implementation
'org.memobase:memobase-service-utilities:
1.12
.2'
implementation
'ch.memobase:mapper-service-configuration:0.3.
4
'
implementation
'org.memobase:memobase-service-utilities:
2.0
.2'
implementation
'ch.memobase:mapper-service-configuration:0.3.
6
'
// Logging Framework
implementation
"org.apache.logging.log4j:log4j-api:${log4jV}"
implementation
"org.apache.logging.log4j:log4j-core:${log4jV}"
...
...
src/main/kotlin/KafkaTopology.kt
View file @
bcd52365
...
...
@@ -23,25 +23,27 @@ import ch.memobase.kafka.utils.ConfigJoiner
import
ch.memobase.kafka.utils.models.ImportService
import
ch.memobase.kafka.utils.models.JoinedValues
import
ch.memobase.kafka.utils.models.ValueWithException
import
ch.memobase.mapping.MapperConfiguration
import
ch.memobase.mapping.MappingConfigurationParser
import
ch.memobase.mapping.exceptions.InvalidMappingException
import
ch.memobase.reporting.Report
import
ch.memobase.reporting.ReportStatus
import
ch.memobase.settings.HeaderExtractionTransformSupplier
import
ch.memobase.settings.HeaderMetadata
import
ch.memobase.settings.SettingsLoader
import
com.beust.klaxon.Klaxon
import
com.beust.klaxon.KlaxonException
import
mapping.MapperConfiguration
import
org.apache.kafka.common.serialization.Serdes
import
org.apache.kafka.streams.KeyValue
import
org.apache.kafka.streams.StreamsBuilder
import
org.apache.kafka.streams.kstream.KStream
import
org.apache.kafka.streams.kstream.Predicate
import
org.apache.logging.log4j.LogManager
import
org.memobase.helpers.ReportStatus
import
org.memobase.settings.SettingsLoader
import
settings.HeaderExtractionTransformSupplier
import
settings.HeaderMetadata
class
KafkaTopology
(
private
val
settings
:
SettingsLoader
)
{
private
val
log
=
LogManager
.
getLogger
(
"KafkaTopology"
)
private
val
reportTopic
=
settings
.
processReportTopic
private
val
klaxon
=
Klaxon
()
...
...
@@ -80,7 +82,7 @@ class KafkaTopology(
handledStream
[
0
]
.
mapValues
{
readOnlyKey
,
value
->
Report
(
readOnlyKey
,
ReportStatus
.
fa
ilure
,
value
.
third
).
toJson
()
Report
(
readOnlyKey
,
ReportStatus
.
fa
tal
,
value
.
third
,
Service
.
step
).
toJson
()
}
.
to
(
reportTopic
)
...
...
@@ -98,8 +100,9 @@ class KafkaTopology(
.
mapValues
{
key
,
value
->
value
.
third
?.
toJson
()
?:
Report
(
key
,
ReportStatus
.
failure
,
"Caught an error, but not report was created."
ReportStatus
.
fatal
,
"Caught an error, but not report was created."
,
Service
.
step
)
}
.
to
(
reportTopic
)
...
...
@@ -169,7 +172,7 @@ class KafkaTopology(
val
noRecordId
=
hasRecordId
[
1
]
noRecordId
.
mapValues
{
key
,
_
->
Report
(
key
,
ReportStatus
.
fa
ilure
,
"No record id found for record $key."
).
toJson
()
}
.
mapValues
{
key
,
_
->
Report
(
key
,
ReportStatus
.
fa
tal
,
"No record id found for record $key."
,
Service
.
step
).
toJson
()
}
.
to
(
reportTopic
)
return
hasRecordId
[
0
]
...
...
@@ -189,8 +192,9 @@ class KafkaTopology(
.
mapValues
{
key
,
_
->
Report
(
key
,
ReportStatus
.
failure
,
"No correct record type found for record $key."
ReportStatus
.
fatal
,
"No correct record type found for record $key."
,
Service
.
step
).
toJson
()
}
.
to
(
reportTopic
)
...
...
@@ -204,7 +208,7 @@ class KafkaTopology(
result
.
first
,
Pair
(
result
.
second
,
Report
(
result
.
first
,
ReportStatus
.
success
,
"Successfully mapped record with id ${result.first}."
)
Report
(
result
.
first
,
ReportStatus
.
success
,
"Successfully mapped record with id ${result.first}."
,
Service
.
step
)
)
)
}
...
...
@@ -225,20 +229,20 @@ class KafkaTopology(
Triple
(
parsedSource
,
mapperConfiguration
.
get
(),
null
)
}
else
{
log
.
error
(
"Parsed source is empty: ${value.first}."
)
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
ilure
,
"Found empty source document."
))
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
tal
,
"Found empty source document."
,
Service
.
step
))
}
}
catch
(
ex
:
InvalidMappingException
)
{
log
.
error
(
ex
.
localizedMessage
)
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
ilure
,
ex
.
localizedMessage
))
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
tal
,
ex
.
localizedMessage
,
Service
.
step
))
}
catch
(
ex
:
KlaxonException
)
{
log
.
error
(
ex
.
localizedMessage
)
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
ilure
,
ex
.
localizedMessage
))
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
tal
,
ex
.
localizedMessage
,
Service
.
step
))
}
catch
(
ex
:
NullPointerException
)
{
log
.
error
(
ex
.
localizedMessage
)
Triple
(
null
,
null
,
Report
(
key
,
ReportStatus
.
fa
ilure
,
"There's no data to be processed: ${ex.localizedMessage}"
)
Report
(
key
,
ReportStatus
.
fa
tal
,
"There's no data to be processed: ${ex.localizedMessage}"
,
Service
.
step
)
)
}
}
...
...
src/main/kotlin/Report.kt
deleted
100644 → 0
View file @
b2786972
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package
org.memobase
import
com.beust.klaxon.Klaxon
import
java.time.LocalDateTime
data class
Report
(
val
id
:
String
,
val
status
:
String
,
val
message
:
String
,
val
step
:
String
=
"mapper-service"
,
val
timestamp
:
String
=
LocalDateTime
.
now
().
toString
()
)
{
fun
toJson
():
String
{
return
Klaxon
().
toJsonString
(
this
)
}
override
fun
equals
(
other
:
Any
?):
Boolean
{
if
(
this
===
other
)
return
true
if
(
javaClass
!=
other
?.
javaClass
)
return
false
other
as
Report
if
(
id
!=
other
.
id
)
return
false
if
(
status
!=
other
.
status
)
return
false
if
(
message
!=
other
.
message
)
return
false
if
(
step
!=
other
.
step
)
return
false
return
true
}
override
fun
hashCode
():
Int
{
var
result
=
id
.
hashCode
()
result
=
31
*
result
+
status
.
hashCode
()
result
=
31
*
result
+
message
.
hashCode
()
result
=
31
*
result
+
step
.
hashCode
()
return
result
}
}
src/main/kotlin/Service.kt
View file @
bcd52365
...
...
@@ -20,9 +20,13 @@ package org.memobase
import
org.apache.kafka.streams.KafkaStreams
import
org.apache.logging.log4j.LogManager
import
org
.memobase.settings.SettingsLoader
import
ch
.memobase.settings.SettingsLoader
class
Service
(
file
:
String
=
"app.yml"
)
{
companion
object
{
const
val
step
=
"mapper-service"
}
private
val
log
=
LogManager
.
getLogger
(
"MapperService"
)
val
settings
=
SettingsLoader
(
...
...
src/main/kotlin/helpers/ReportStatus.kt
deleted
100644 → 0
View file @
b2786972
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package
org.memobase.helpers
object
ReportStatus
{
const
val
success
=
"SUCCESS"
const
val
failure
=
"FAILURE"
}
src/test/kotlin/IntegrationTests.kt
View file @
bcd52365
...
...
@@ -16,6 +16,8 @@
*/
package
org.memobase
import
ch.memobase.rdf.NS
import
ch.memobase.reporting.Report
import
com.beust.klaxon.Klaxon
import
org.apache.jena.rdf.model.ModelFactory
import
org.apache.jena.riot.Lang
...
...
@@ -33,7 +35,6 @@ import org.junit.jupiter.api.assertAll
import
org.junit.jupiter.params.ParameterizedTest
import
org.junit.jupiter.params.provider.MethodSource
import
org.memobase.params.IntegrationTestParams
import
org.memobase.rdf.NS
import
java.io.File
import
java.io.FileOutputStream
import
java.nio.charset.Charset
...
...
@@ -194,7 +195,8 @@ class IntegrationTests {
Report
(
"https://memobase.ch/record/test-record-set-id-ID_1"
,
"SUCCESS"
,
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
"Successfully mapped record with id https://memobase.ch/record/test-record-set-id-ID_1."
,
Service
.
step
)
)
/*,
IntegrationTestParams(
...
...
src/test/kotlin/params/IntegrationTestParams.kt
View file @
bcd52365
...
...
@@ -18,7 +18,7 @@
package
org.memobase.params
import
org
.memobase.Report
import
ch
.memobase.
reporting.
Report
data class
IntegrationTestParams
(
val
count
:
Int
,
...
...
src/test/resources/kafkaTests/5/output.nt
View file @
bcd52365
<https://memobase.ch/record/test-record-set-id-ID_1> <http://memobase.ch/internal/isPublished> "false"^^<http://www.w3.org/2001/XMLSchema#boolean> .
<https://memobase.ch/record/test-record-set-id-ID_1> <http://www.ebu.ch/metadata/ontologies/ebucore/ebucore#hasGenre> _:B .
<https://memobase.ch/record/test-record-set-id-ID_1> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Record> .
<https://memobase.ch/record/test-record-set-id-ID_1> <https://memobase.ch/internal/isPublished> "false"^^<http://www.w3.org/2001/XMLSchema#boolean> .
<https://memobase.ch/record/test-record-set-id-ID_1> <https://www.ica.org/standards/RiC/ontology#hasSubject> _:B .
<https://memobase.ch/record/test-record-set-id-ID_1> <https://www.ica.org/standards/RiC/ontology#heldBy> <https://memobase.ch/institution/test-institution-id> .
<https://memobase.ch/record/test-record-set-id-ID_1> <https://www.ica.org/standards/RiC/ontology#identifiedBy> _:B .
...
...
src/test/resources/kafkaTests/5/turtle-output.ttl
View file @
bcd52365
@prefix
schema:
<http://schema.org/>
.
@prefix
internal:
<http://memobase.ch/internal/>
.
@prefix
internal:
<http
s
://memobase.ch/internal/>
.
@prefix
mbrs:
<https://memobase.ch/recordSet/>
.
@prefix
owl:
<http://www.w3.org/2002/07/owl#>
.
@prefix
wdt:
<http://www.wikidata.org/prop/direct/>
.
...
...
@@ -24,11 +24,11 @@
mbr:
test-record-set-id-ID_1
a
rico:
Record
;
internal:
isPublished
false
;
ebucore:
hasGenre
[
a
skos:
Concept
;
skos:
editorialNote
"Ursprungsfeld: Genre"
;
skos:
prefLabel
"Genre 1, Genre 2"
@fr
]
;
internal:
isPublished
false
;
rico:
hasSubject
[
a
skos:
Concept
;
skos:
editorialNote
"Ursprungsfeld: Mots clés"
;
skos:
prefLabel
"Schlagwort 1, Schlagwort 2"
@fr
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment