Commit 820406ca authored by Matthias's avatar Matthias
Browse files

Merge branch 'addHeaderMetadata' into 'master'

Add header metadata

See merge request !2
parents 0ff0bfa4 a5580f25
Pipeline #19572 passed with stages
in 4 minutes and 14 seconds
......@@ -5,79 +5,10 @@ stages:
- test
- publish
test:
stage: test
image: gradle:6.3-jdk8
tags:
- mbr
script:
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast --tests "org.memobase.Tests"
.build-image:
stage: publish
image: docker:stable
services:
- docker:dind
script:
- docker login -u "$REGISTRY_USER" -p "$REGISTRY_PASSWORD" "$REGISTRY"
- docker build --pull -t "$IMAGE_TAG" -f "$DOCKERFILE" .
- docker push "$IMAGE_TAG"
- docker logout
build-tagged-image:
extends: .build-image
variables:
IMAGE_TAG: "$CI_REGISTRY_IMAGE:$CI_COMMIT_TAG"
REGISTRY_PASSWORD: "$CI_REGISTRY_PASSWORD"
REGISTRY_USER: "$CI_REGISTRY_USER"
REGISTRY: "$CI_REGISTRY"
DOCKERFILE: "Dockerfile"
only:
- tags
build-latest-image:
extends: .build-image
variables:
IMAGE_TAG: "$CI_REGISTRY_IMAGE:latest"
REGISTRY_PASSWORD: "$CI_REGISTRY_PASSWORD"
REGISTRY_USER: "$CI_REGISTRY_USER"
REGISTRY: "$CI_REGISTRY"
DOCKERFILE: "Dockerfile"
only:
- master
build-feature-branch-image:
extends: .build-image
variables:
IMAGE_TAG: "$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME"
REGISTRY_PASSWORD: "$CI_REGISTRY_PASSWORD"
REGISTRY_USER: "$CI_REGISTRY_USER"
REGISTRY: "$CI_REGISTRY"
DOCKERFILE: "Dockerfile"
except:
- master
- tags
test-chart:
stage: test
image: dtzar/helm-kubectl:3.2.0
tags:
- mbr
script:
- helm lint helm-charts/
publish-chart:
stage: publish
image: dtzar/helm-kubectl:3.2.0
tags:
- mbr
script:
- export HELM_EXPERIMENTAL_OCI=1
- helm registry login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" "$CI_REGISTRY"
- ./insert_chart_versions.sh
- helm chart save helm-charts/ "$CI_REGISTRY/$CI_PROJECT_PATH:$CI_COMMIT_TAG-chart"
- helm chart push "$CI_REGISTRY/$CI_PROJECT_PATH:$CI_COMMIT_TAG-chart"
- helm registry logout "$CI_REGISTRY"
only:
- tags
\ No newline at end of file
include:
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'gradle-test/gradle-test.yml'
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'docker-image/docker-image.yml'
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'helm-chart/helm-chart.yml'
......@@ -32,7 +32,7 @@ ext {
}
dependencies {
implementation 'org.memobase:memobase-service-utilities:1.7.0'
implementation 'org.memobase:memobase-service-utilities:2.0.2'
implementation 'org.memobase:fedora-client:0.4.1'
// Logging Framework
......
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
......@@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m"'
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
......@@ -66,6 +82,7 @@ esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
......@@ -109,10 +126,11 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
......@@ -138,19 +156,19 @@ if $cygwin ; then
else
eval `echo args$i`="\"$arg\""
fi
i=$((i+1))
i=`expr $i + 1`
done
case $i in
(0) set -- ;;
(1) set -- "$args0" ;;
(2) set -- "$args0" "$args1" ;;
(3) set -- "$args0" "$args1" "$args2" ;;
(4) set -- "$args0" "$args1" "$args2" "$args3" ;;
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
......@@ -159,14 +177,9 @@ save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=$(save "$@")
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
......@@ -13,8 +29,11 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m"
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
......@@ -65,6 +84,7 @@ set CMD_LINE_ARGS=%*
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
......
......@@ -5,7 +5,7 @@
#image values
registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/postprocessing/fedora-metadata-extractor"
tag: "latest"
tag: "0.0.11"
deploymentName: fedora-metadata-extractor
kafkaConfigs: prod-kafka-bootstrap-servers
......
......@@ -18,31 +18,41 @@
package org.memobase
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.util.StdDateFormat
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.impl.SelectorImpl
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClientImpl
import org.memobase.fedora.RdfResponseTypes
import org.memobase.model.EventMessage
import org.memobase.rdf.*
import org.memobase.settings.SettingsLoader
import rdf.MB
import java.io.StringWriter
import ch.memobase.rdf.*
import ch.memobase.settings.SettingsLoader
import org.apache.jena.rdf.model.*
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeader
import java.net.URI
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.*
import java.lang.*
import kotlin.collections.HashMap
class KafkaTopology(
private val settings: SettingsLoader
......@@ -54,47 +64,139 @@ class KafkaTopology(
.properties(settings.appSettings, "fedora")
.build()
fun build(): Topology {
val builder = StreamsBuilder()
fun run(): Topology {
val jsonMapper = ObjectMapper().apply {
registerKotlinModule()
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
setDateFormat(StdDateFormat())
}
val stream = builder.stream<String, String>(settings.inputTopic)
lateinit var consumer: Consumer<String, String>
lateinit var producer: Producer<String, String>
try {
consumer = createConsumer()
producer = createProducer()
consumer.subscribe(listOf(settings.inputTopic))
} catch (ex: Exception) {
log.error("Exception during ceating kafka consumer/producer: " + ex.toString())
}
val objectBranches = stream
.flatMapValues { value -> parseMessage(value) }
.branch(
// TODO: Add actual values.
Predicate { _, value -> value.objectType == "rico:Record" },
Predicate { _, value -> value.objectType == "rico:CorporateBody" },
Predicate { _, value -> value.objectType == "rico:RecordSet" }
)
while (true) {
try {
val records = consumer.poll(Duration.ofSeconds(1))
records.iterator().forEach {
var eventJson = it.value()
var event = jsonMapper.readValue(eventJson, org.memobase.model.EventMessage::class.java)
var eventRdf: List<Pair<EventMessage, String>> = requestPrimaryResource(event)
var eventModel: Pair<EventMessage, Model> = parseModel(eventRdf[0])
val valueWithHeader: Triple<EventMessage, Model, HashMap<String, String>> = extractHeaderMetadata(eventModel)
val headerData: HashMap<String, String> = valueWithHeader.third
var model: Model = addEventType(eventModel)
if (headerData["objectType"] == "rico:Record") {
model = requestAdditionalRecordResources(model)
}
model = filterFedoraProperties(model)
val modelString: String = Functions.writeModel(model)
objectBranches[0]
.flatMapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> requestAdditionalRecordResources(value) }
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> Functions.writeModel(value) }
.to(settings.outputTopic + "-json-records")
objectBranches[1]
.flatMapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> Functions.writeModel(value) }
.to(settings.outputTopic + "-json-institutions")
objectBranches[2]
.flatMapValues { value -> requestPrimaryResource(value) }
.mapValues { value -> parseModel(value) }
.mapValues { value -> addEventType(value) }
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> Functions.writeModel(value) }
.to(settings.outputTopic + "-json-record-sets")
return builder.build()
var outputTopic = settings.outputTopic + when (event.objectType) {
"rico:Record" -> "-json-records"
"rico:RecordSet" -> "-json-record-sets"
"rico:CorporateBody" -> "-json-institutions"
else -> "-XXX"
}
var producerRecord: ProducerRecord<String, String> =
ProducerRecord(outputTopic, event.eventId, modelString)
producerRecord.headers().add("institutionId", headerData["institutionId"]?.toByteArray())
producerRecord.headers().add("recordSetId", headerData["recordSetId"]?.toByteArray())
producerRecord.headers().add("recordId", headerData["recordId"]?.toByteArray())
producerRecord.headers().add("isPublished", headerData["isPublished"]?.toByteArray())
producer.send(producerRecord)
log.debug("FedoraEvent " + event.eventId +
" written to topic '" + outputTopic + "'.")
}
} catch (ex: Exception) {
log.error("Exception during processing eventMessages: " + ex.toString())
}
}
}
private fun extractHeaderMetadata(value: Pair<EventMessage, Model>): Triple<EventMessage, Model, HashMap<String, String>> {
val eventMsg = value.first
val rdfModel = value.second
var headerData: HashMap<String, String> = HashMap<String, String>()
try {
val subjectUri = eventMsg.objectPath
var isPublishedProp = MB.isPublished // TODO: use MB.isPublished as soon as http VS https is unified
isPublishedProp = ResourceFactory.createProperty("http://memobase.ch/internal/", "isPublished")
if (eventMsg.objectType == "rico:Record") {
headerData.put(
"recordId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val recordSetId = getPropertyAsString(subjectUri, rdfModel, RICO.isPartOf)
headerData.put(
"recordSetId",
recordSetId.substring(recordSetId.lastIndexOf('/') + 1)
)
val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy)
headerData.put(
"institutionId",
institutionId.substring(institutionId.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
} else if (eventMsg.objectType == "rico:RecordSet") {
headerData.put(
"recordsetId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy)
headerData.put(
"institutionId",
institutionId.substring(institutionId.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
} else if (eventMsg.objectType == "rico:Institution") {
headerData.put(
"institutionId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val recordSetIds = getPropertyAsString(subjectUri, rdfModel, RICO.isHolderOf)
headerData.put(
"recordsetIds",
recordSetIds.substring(eventMsg.objectPath.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
}
} catch (ex: Exception) {
log.error("Exception while extracting HeaderData for " + eventMsg.eventId + ": " + ex.message)
}
return Triple(eventMsg, rdfModel, headerData)
}
private fun getResourceFromRdf(model: Model, uri: String): Resource {
return model.getResource(uri)
}
private fun getPropertyAsString(subjectUri: String, model: Model, property: Property): String {
val subject = model.getResource(subjectUri)
val objects = subject.listProperties(property).mapWith {
val o = it.`object`
when {
o.isURIResource -> o.asResource().uri
o.isLiteral -> o.asLiteral().value.toString()
else -> o.toString()
}
}.toList()
return when {
objects.size <= 0 -> throw Exception("No property ${property.toString()} found.")
objects.size > 1 -> throw Exception("More than 1 property ${property.toString()} found")
else -> objects[0]
}
}
private fun parseMessage(data: String): List<EventMessage> {
......@@ -200,4 +302,35 @@ class KafkaTopology(
}
return model
}
private fun createConsumer(): Consumer<String, String> {
val props = Properties()
props["bootstrap.servers"] = listOf(settings.kafkaConsumerSettings)[0]["bootstrap.servers"].toString()
props["group.id"] = listOf(settings.kafkaConsumerSettings)[0]["group.id"].toString()
props["client.id"] = listOf(settings.kafkaConsumerSettings)[0]["client.id"].toString()
props["key.deserializer"] = StringDeserializer::class.java
props["value.deserializer"] = StringDeserializer::class.java
props["enable.auto.commit"] = false
props["max.poll.records"] = "10"
props["max.poll.interval.ms"] = "3600000"
props["max.poll.interval.ms"] = "3600000"
props["auto.offset.reset"] = "earliest"
props["allow.auto.create.topics"] = false
lateinit var consumer: org.apache.kafka.clients.consumer.KafkaConsumer<String, String>
try {
consumer = KafkaConsumer<String, String>(props)
} catch (t: Exception) {
consumer = KafkaConsumer<String, String>(props)
}
return consumer
}
private fun createProducer(): Producer<String, String> {
val props = Properties()
props["bootstrap.servers"] = listOf(settings.kafkaProducerSettings)[0]["bootstrap.servers"].toString()
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = StringSerializer::class.java
return KafkaProducer<String, String>(props)
}
}
......@@ -20,7 +20,7 @@ package org.memobase
import kotlin.system.exitProcess
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
import ch.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("FedoraMetadataExtractionService")
......@@ -33,21 +33,13 @@ class Service(file: String = "app.yml") {
"fedora.externalBaseUrl"
),
file,
useStreamsConfig = true
useStreamsConfig = false,
useConsumerConfig = true,
useProducerConfig = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
val topology = KafkaTopology(settings)
fun run() {
stream.use {
it.start()
while (stream.state().isRunning) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
log.info("Shutting application down now...")
exitProcess(0)
}
topology.run()
}
}
......@@ -4,7 +4,7 @@ data class EventMessage(
val eventId: String,
val eventTimestamp: String,
val eventType: String,
val objectPath: String,
var objectPath: String,
val objectType: String,
val objectVersion: String? = null
)
\ No newline at end of file
......@@ -5,6 +5,10 @@ app:
internalBaseUrl: ${FEDORA_INTERNAL_BASE_URL:?system}
externalBaseUrl: ${FEDORA_EXTERNAL_BASE_URL:?system}
kafka:
producer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
consumer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
......
......@@ -43,9 +43,11 @@ class Tests {
private fun readFile(fileName: String): String {
return File("$resourcePath/$fileName").readText(Charset.defaultCharset())
}
private fun createInputStream(fileName: String): InputStream {
return File("$resourcePath/$fileName").inputStream()
}
private val regex = Regex("(_:B[A-Za-z0-9]+)")
@Test
......@@ -62,9 +64,13 @@ class Tests {
@ParameterizedTest
@MethodSource("kafkaTests")
fun `test kafka topology`(params: KafkaTestParams) {
assert(true)
/*
/*
val service = Service(params.settingsFileName)
service.run()
assert(true)
*/
/*
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(StringSerializer(), StringSerializer())
val input = readFile("${params.count}/input.json")
......@@ -88,7 +94,8 @@ class Tests {
replacedString
}.sorted().reduce { acc, s -> acc + "\n" + s }
assertThat(sortedResult)
.isEqualTo(output)*/
.isEqualTo(output)
*/
}