Commit 1a3f5d32 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement kafka topology

parent 28f50d76
Pipeline #8684 failed with stages
in 2 minutes and 37 seconds
......@@ -31,8 +31,9 @@ class App {
try {
val settings = SettingsLoader(
listOf(
"institution.id",
"recordSet.id"
"institutionId",
"recordSetId",
"configs"
),
useStreamsConfig = true
)
......
......@@ -18,26 +18,56 @@
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.memobase.builder.ResourceBuilder
import org.memobase.mapping.MappingConfig
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader
) {
private val log = LogManager.getLogger("KafkaTopology")
private val config = MappingConfig(settings.appSettings.getProperty("configs"))
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
stream.flatMapValues { value -> parseJsonFile(value) }
.mapValues { value -> buildResources(value) }
.flatMap { _, value -> writeResource(value) }
.to(settings.outputTopic)
return builder.build()
}
private fun parseJsonFile(message: String): List<Map<String, String>> {
Klaxon().parse<Map<String, String>>(message).let {
return if (it != null) {
listOf(it)
} else {
emptyList()
}
}
}
private fun writeTriples() {
private fun buildResources(value: Map<String, String>): ResourceBuilder {
return ResourceBuilder(
config.uriField,
config.recordType,
config.recordFieldMappers,
config.physicalObjectFieldMappers,
value,
settings.appSettings.getProperty("institutionId"),
settings.appSettings.getProperty("recordSetId")
)
}
private fun writeResource(value: ResourceBuilder): List<KeyValue<String, String>> {
return value.write().map { return@map KeyValue(it.first, it.second) }
}
}
\ No newline at end of file
......@@ -59,11 +59,11 @@ class ResourceBuilder(
resources = listOf(record, physicalObject)
}
fun write(): List<String> {
fun write(): List<Pair<String, String>> {
return resources.map { resource ->
StringWriter().use { writer ->
RDFDataMgr.write(writer, resource.model, RDFFormat.JSONLD_COMPACT_FLAT)
return@map writer.toString()
RDFDataMgr.write(writer, resource.model, RDFFormat.NTRIPLES_UTF8)
return@map Pair(resource.resource.uri, writer.toString().trim())
}
}
}
......
id: ${JOB_ID:?system}
app:
institutionId: ${INSTITUTION_ID:?system}
recordSetId: ${INSTITUTION_ID:?system}
configs: /configs/mappings
kafka:
stream:
bootstrap.servers: localhost:9092
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
topic:
in: ${TOPIC_IN:?system}
out: ${TOPIC_OUT:?system}
\ No newline at end of file
out: ${TOPIC_OUT:?system}
process: ${TOPIC_PROCESS:?system}
\ No newline at end of file
......@@ -18,26 +18,48 @@
package org.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.builder.ResourceBuilder
import org.memobase.mapping.MappingConfig
import org.memobase.settings.SettingsLoader
import java.io.File
import java.io.FileOutputStream
import java.nio.charset.Charset
import java.nio.file.Files
import java.nio.file.Paths
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class Tests {
private val log = LogManager.getLogger("TestLogger")
private val resourcePath = "src/test/resources/data"
private val resourcePath = "src/test/resources"
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
@Test
fun `test mapping config validation`() {
val config = MappingConfig(configTestBasePath + "minimalValid")
assertThat(config).isNotNull
.hasFieldOrPropertyWithValue("uriField", "TestField")
.hasFieldOrPropertyWithValue("recordType", "Foto")
}
private val configTestBasePath = "src/test/resources/configTests/"
@Test
fun `test mapping config parser`() {
val config =
......@@ -61,10 +83,53 @@ class Tests {
string.forEach { s ->
count += 1
FileOutputStream("src/test/resources/output/data$count.json").bufferedWriter(Charset.defaultCharset()).use {
it.write(s)
it.write(s.second)
}
}
}
}
@Test
fun `test kafka topology`() {
val settings = SettingsLoader(
listOf(
"institutionId",
"recordSetId",
"configs"
),
fileName = "kafkaTest1.yml",
useStreamsConfig = true
)
val topology = KafkaTopology(settings).build()
val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
val testDriver = TopologyTestDriver(KafkaTopology(settings).build(), settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
settings.inputTopic, "", readFile("kafkaTests/1/input.json")
)
)
val record = testDriver.readOutput(
settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
val sortedResult = record.value().lines().sorted().reduce { acc, s -> acc + "\n" + s }
assertThat(record)
.isNotNull
.hasFieldOrPropertyWithValue("key", "https://memobase.ch/record/BAZ-MEI_49884")
// TODO: find a way to test output. the problem is, that the generated blank node ids are different each time.
// one possibilty would be to use and sort a json document as there the keys are deterministic. maybe. or maybe not...
//assertThat(sortedResult)
// .isEqualTo(readFile("kafkaTests/1/output.nt"))
}
......
record:
uri: TestField
type: Foto
\ No newline at end of file
id: jobXYZ
app:
institutionId: "BAZ"
recordSetId: "BAZ-B_MEI"
configs: src/test/resources/kafkaTests/1/config
kafka:
streams:
bootstrap.servers: localhost:12345
application.id: test-clinet-1234
topic:
in: test-topic-in
out: test-topic-out
process: test-topic-process
\ No newline at end of file
physical:
medium: # ebucore:hasMedium
de:
- medium
physicalCharacteristics: # rico:physicalCharacteristics
de:
- format
- orientation
- zustand
colour:
- farbe # rdau:P60558 has colour content (provisional field)
descriptiveNote: #rico:descriptiveNote
de:
- bemerkungen
identifiers: # blank node rico:Identifier
original: original_id
\ No newline at end of file
record:
uri: original_id
type: Foto # is always a constant!
title:
de: titel
descriptiveNote: # rico:descriptiveNote
de: beschreibung
scopeAndContent: # rico:scopeAndContent
de:
- entstehungsumstände
identifiers:
callNumber: original_id
titles: # blank node rico:Title
main: # serial, broadcast
de: titel
genre:
- prefLabel:
de:
- genre
editorialNote:
const: "Ursprungsfeld: Genre"
subject: # blank node skos:Concept with rico:hasSubject
- prefLabel:
de:
- baufunktion
editorialNote:
const: "Ursprungsfeld: Baufunktion"
placeOfCapture: # blank node rico:Place with relation rdau:P60556 "has place of capture"
name:
de: aufnahmeort
creators: # a list of creators (with creationRelation rico:type)
- creatorType:
const: "Fotograf"
corporateBody:
name:
de: AutorIn
- creatorType:
const: "Auftraggeber"
agent:
name:
de: auftraggeber
- creatorType:
const: "Architekt"
agent:
name:
de: architekt
hasFindingAid: verwandte_dokumente # rdau:P60262 "has finding aid" (provisional field)
creationDate: erstellung # blank node rico:DateSet with rico:expressedDate
rights: # blank nodes rico:Rule with type "x"
holder: rechtinhaber
usage: nutzungsrecht
access:
const: public
relatedPlaces: # blank node rico:Place with relation dcterms:spatial
- name:
de: quartier
- name:
de:
- strasse1
- hausnummer1
- name:
de:
- strasse2
- hausnummer2
- name:
de: bezeichnung_bauobjekt
hasSponsoringAgent: # rdau:P60451 with value "Memoriav"
const: Memoriav
\ No newline at end of file
{
"titel": "«Villa Siegel», Zürich",
"beschreibung": "Villa mit Garten und Brunnen im Vordergrund. Vermutlich von Architekt Walz",
"genre": "Bauwerk",
"aufnahmeort": "Zürich",
"autorin": "Atelier Meiner",
"auftraggeber": "Walz",
"verwandte_dokumente": "Auftragsregister Bd. 6; Bildverzeichnis Bd. 7",
"erstellung": "19210914",
"rechteinhaber": "BAZ ",
"nutzungsrecht": "nach Absprache",
"medium": "Negativ Nitrat (NN)",
"format": "18x24",
"farbe": "sw",
"orientation": "Querformat",
"zustand": "Nitratschaden",
"konservatorische_bewertung": "B",
"original_id": "MEI_49884",
"alte_signatur": "49884",
"signatur_digitalisat": "MEI_49884",
"bearbeitungskontext": "Pilot Meiner"
}
\ No newline at end of file
<https://memobase.ch/record/BAZ-MEI_49884> <http://purl.org/dc/terms/created> _:B822e9b36X2Db650X2D40f3X2Db910X2D713f0d193878 .
<https://memobase.ch/record/BAZ-MEI_49884> <http://rdaregistry.info/Elements/u/P60262> "Auftragsregister Bd. 6; Bildverzeichnis Bd. 7" .
<https://memobase.ch/record/BAZ-MEI_49884> <http://rdaregistry.info/Elements/u/P60451> "Memoriav" .
<https://memobase.ch/record/BAZ-MEI_49884> <http://rdaregistry.info/Elements/u/P60556> _:B16bf53adX2De0bcX2D4ab2X2D9474X2Db06e2e0b78b9 .
<https://memobase.ch/record/BAZ-MEI_49884> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Record> .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#descriptiveNote> "Villa mit Garten und Brunnen im Vordergrund. Vermutlich von Architekt Walz"@de .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#hasInstantiation> <https://memobase.ch/instatiation/physical/BAZ-MEI_49884-0> .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/BAZ> .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#hasTitle> _:B1ac133b7X2D5a94X2D4873X2D9d62X2D1a2c25cd849f .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#heldBy> <https://memobase.ch/institution/BAZ> .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#identifiedBy> _:B85880060X2D648dX2D4d18X2Db313X2D0db3f3185f25 .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#identifiedBy> _:B90b39583X2D9977X2D4312X2Db95dX2Dec0250c89599 .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#isPartOf> <https://memobase.ch/recordSet/BAZ-B_MEI> .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#recordResourceOrInstantiationIsSourceOfCreationRelation> _:Bcc33847aX2D48c6X2D450aX2Dafc6X2D3808bd8977a6 .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#title> "«Villa Siegel», Zürich"@de .
<https://memobase.ch/record/BAZ-MEI_49884> <https://www.ica.org/standards/RiC/ontology#type> "Foto" .
_:B16bf53adX2De0bcX2D4ab2X2D9474X2Db06e2e0b78b9 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Place> .
_:B16bf53adX2De0bcX2D4ab2X2D9474X2Db06e2e0b78b9 <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/BAZ> .
_:B16bf53adX2De0bcX2D4ab2X2D9474X2Db06e2e0b78b9 <https://www.ica.org/standards/RiC/ontology#name> "Zürich"@de .
_:B1ac133b7X2D5a94X2D4873X2D9d62X2D1a2c25cd849f <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Title> .
_:B1ac133b7X2D5a94X2D4873X2D9d62X2D1a2c25cd849f <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/BAZ> .
_:B1ac133b7X2D5a94X2D4873X2D9d62X2D1a2c25cd849f <https://www.ica.org/standards/RiC/ontology#isTitleOf> <https://memobase.ch/record/BAZ-MEI_49884> .
_:B1ac133b7X2D5a94X2D4873X2D9d62X2D1a2c25cd849f <https://www.ica.org/standards/RiC/ontology#title> "«Villa Siegel», Zürich"@de .
_:B1ac133b7X2D5a94X2D4873X2D9d62X2D1a2c25cd849f <https://www.ica.org/standards/RiC/ontology#type> "main" .
_:B822e9b36X2Db650X2D40f3X2Db910X2D713f0d193878 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#DateSet> .
_:B822e9b36X2Db650X2D40f3X2Db910X2D713f0d193878 <https://www.ica.org/standards/RiC/ontology#expressedDate> "19210914" .
_:B85880060X2D648dX2D4d18X2Db313X2D0db3f3185f25 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Identifier> .
_:B85880060X2D648dX2D4d18X2Db313X2D0db3f3185f25 <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/Memoriav> .
_:B85880060X2D648dX2D4d18X2Db313X2D0db3f3185f25 <https://www.ica.org/standards/RiC/ontology#identifier> "https://memobase.ch/record/BAZ-MEI_49884" .
_:B85880060X2D648dX2D4d18X2Db313X2D0db3f3185f25 <https://www.ica.org/standards/RiC/ontology#type> "main" .
_:B90b39583X2D9977X2D4312X2Db95dX2Dec0250c89599 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Identifier> .
_:B90b39583X2D9977X2D4312X2Db95dX2Dec0250c89599 <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/BAZ> .
_:B90b39583X2D9977X2D4312X2Db95dX2Dec0250c89599 <https://www.ica.org/standards/RiC/ontology#identifier> "MEI_49884" .
_:B90b39583X2D9977X2D4312X2Db95dX2Dec0250c89599 <https://www.ica.org/standards/RiC/ontology#type> "callNumber" .
_:Baa76f150X2D4728X2D489aX2Dbef7X2D147a0f9086e9 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#Agent> .
_:Baa76f150X2D4728X2D489aX2Dbef7X2D147a0f9086e9 <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/BAZ> .
_:Baa76f150X2D4728X2D489aX2Dbef7X2D147a0f9086e9 <https://www.ica.org/standards/RiC/ontology#name> "Walz"@de .
_:Bcc33847aX2D48c6X2D450aX2Dafc6X2D3808bd8977a6 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.ica.org/standards/RiC/ontology#CreationRelation> .
_:Bcc33847aX2D48c6X2D450aX2Dafc6X2D3808bd8977a6 <https://www.ica.org/standards/RiC/ontology#creationRelationHasSource> <https://memobase.ch/record/BAZ-MEI_49884> .
_:Bcc33847aX2D48c6X2D450aX2Dafc6X2D3808bd8977a6 <https://www.ica.org/standards/RiC/ontology#creationRelationHasTarget> _:Baa76f150X2D4728X2D489aX2Dbef7X2D147a0f9086e9 .
_:Bcc33847aX2D48c6X2D450aX2Dafc6X2D3808bd8977a6 <https://www.ica.org/standards/RiC/ontology#hasProvenance> <https://memobase.ch/institution/BAZ> .
_:Bcc33847aX2D48c6X2D450aX2Dafc6X2D3808bd8977a6 <https://www.ica.org/standards/RiC/ontology#type> "Auftraggeber" .
\ No newline at end of file
sftp:
host: sb-uingest1.swissbib.unibas.ch
port: 22
user: mb_sftp
password: ${SFTP_PASSWORD:?env}
fingerprint: ${HOST_KEY_VERIFIER:?env}
app:
csv:
header:
count: 3
line: true
index: 2
identifier: 0
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: test-client-1234
stream:
bootstrap.servers: localhost:12345
application.id: test-clinet-1234
topic:
in: test-topic-in
out: test-topic-out
\ No newline at end of file
sftp:
host: sb-uingest1.swissbib.unibas.ch
port: 22
user: mb_sftp
password: ${SFTP_PASSWORD:?env}
fingerprint: ${HOST_KEY_VERIFIER:?env}
app:
csv:
header:
count: 3
line: false
index: 0
identifier: 0
kafka:
producer:
bootstrap.servers: localhost:12345
client.id: test-client-1234
stream:
bootstrap.servers: localhost:12345
application.id: test-clinet-1234
topic:
in: test-topic-in
out: test-topic-out
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment