Commit a5580f25 authored by Matthias's avatar Matthias
Browse files

Add method getObjectUriAsString

This commit adds a method getObjectUriAsString to get the URI of a
object in a RDF statement.
parent 0ff0bfa4
...@@ -5,79 +5,10 @@ stages: ...@@ -5,79 +5,10 @@ stages:
- test - test
- publish - publish
test: include:
stage: test - project: 'memoriav/memobase-2020/utilities/ci-templates'
image: gradle:6.3-jdk8 file: 'gradle-test/gradle-test.yml'
tags: - project: 'memoriav/memobase-2020/utilities/ci-templates'
- mbr file: 'docker-image/docker-image.yml'
script: - project: 'memoriav/memobase-2020/utilities/ci-templates'
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast --tests "org.memobase.Tests" file: 'helm-chart/helm-chart.yml'
.build-image:
stage: publish
image: docker:stable
services:
- docker:dind
script:
- docker login -u "$REGISTRY_USER" -p "$REGISTRY_PASSWORD" "$REGISTRY"
- docker build --pull -t "$IMAGE_TAG" -f "$DOCKERFILE" .
- docker push "$IMAGE_TAG"
- docker logout
build-tagged-image:
extends: .build-image
variables:
IMAGE_TAG: "$CI_REGISTRY_IMAGE:$CI_COMMIT_TAG"
REGISTRY_PASSWORD: "$CI_REGISTRY_PASSWORD"
REGISTRY_USER: "$CI_REGISTRY_USER"
REGISTRY: "$CI_REGISTRY"
DOCKERFILE: "Dockerfile"
only:
- tags
build-latest-image:
extends: .build-image
variables:
IMAGE_TAG: "$CI_REGISTRY_IMAGE:latest"
REGISTRY_PASSWORD: "$CI_REGISTRY_PASSWORD"
REGISTRY_USER: "$CI_REGISTRY_USER"
REGISTRY: "$CI_REGISTRY"
DOCKERFILE: "Dockerfile"
only:
- master
build-feature-branch-image:
extends: .build-image
variables:
IMAGE_TAG: "$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME"
REGISTRY_PASSWORD: "$CI_REGISTRY_PASSWORD"
REGISTRY_USER: "$CI_REGISTRY_USER"
REGISTRY: "$CI_REGISTRY"
DOCKERFILE: "Dockerfile"
except:
- master
- tags
test-chart:
stage: test
image: dtzar/helm-kubectl:3.2.0
tags:
- mbr
script:
- helm lint helm-charts/
publish-chart:
stage: publish
image: dtzar/helm-kubectl:3.2.0
tags:
- mbr
script:
- export HELM_EXPERIMENTAL_OCI=1
- helm registry login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" "$CI_REGISTRY"
- ./insert_chart_versions.sh
- helm chart save helm-charts/ "$CI_REGISTRY/$CI_PROJECT_PATH:$CI_COMMIT_TAG-chart"
- helm chart push "$CI_REGISTRY/$CI_PROJECT_PATH:$CI_COMMIT_TAG-chart"
- helm registry logout "$CI_REGISTRY"
only:
- tags
\ No newline at end of file
...@@ -32,7 +32,7 @@ ext { ...@@ -32,7 +32,7 @@ ext {
} }
dependencies { dependencies {
implementation 'org.memobase:memobase-service-utilities:1.7.0' implementation 'org.memobase:memobase-service-utilities:2.0.2'
implementation 'org.memobase:fedora-client:0.4.1' implementation 'org.memobase:fedora-client:0.4.1'
// Logging Framework // Logging Framework
......
#!/usr/bin/env sh #!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
############################################################################## ##############################################################################
## ##
## Gradle start up script for UN*X ## Gradle start up script for UN*X
...@@ -28,7 +44,7 @@ APP_NAME="Gradle" ...@@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"` APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m"' DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value. # Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum" MAX_FD="maximum"
...@@ -66,6 +82,7 @@ esac ...@@ -66,6 +82,7 @@ esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM. # Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
...@@ -109,10 +126,11 @@ if $darwin; then ...@@ -109,10 +126,11 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi fi
# For Cygwin, switch paths to Windows format before running java # For Cygwin or MSYS, switch paths to Windows format before running java
if $cygwin ; then if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"` APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"` JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath # We build the pattern for arguments to be converted via cygpath
...@@ -138,19 +156,19 @@ if $cygwin ; then ...@@ -138,19 +156,19 @@ if $cygwin ; then
else else
eval `echo args$i`="\"$arg\"" eval `echo args$i`="\"$arg\""
fi fi
i=$((i+1)) i=`expr $i + 1`
done done
case $i in case $i in
(0) set -- ;; 0) set -- ;;
(1) set -- "$args0" ;; 1) set -- "$args0" ;;
(2) set -- "$args0" "$args1" ;; 2) set -- "$args0" "$args1" ;;
(3) set -- "$args0" "$args1" "$args2" ;; 3) set -- "$args0" "$args1" "$args2" ;;
(4) set -- "$args0" "$args1" "$args2" "$args3" ;; 4) set -- "$args0" "$args1" "$args2" "$args3" ;;
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac esac
fi fi
...@@ -159,14 +177,9 @@ save () { ...@@ -159,14 +177,9 @@ save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " " echo " "
} }
APP_ARGS=$(save "$@") APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules # Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi
exec "$JAVACMD" "$@" exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off @if "%DEBUG%" == "" @echo off
@rem ########################################################################## @rem ##########################################################################
@rem @rem
...@@ -13,8 +29,11 @@ if "%DIRNAME%" == "" set DIRNAME=. ...@@ -13,8 +29,11 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0 set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME% set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe @rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome if defined JAVA_HOME goto findJavaFromJavaHome
...@@ -65,6 +84,7 @@ set CMD_LINE_ARGS=%* ...@@ -65,6 +84,7 @@ set CMD_LINE_ARGS=%*
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle @rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#image values #image values
registry: "cr.gitlab.switch.ch" registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/postprocessing/fedora-metadata-extractor" image: "memoriav/memobase-2020/services/postprocessing/fedora-metadata-extractor"
tag: "latest" tag: "0.0.11"
deploymentName: fedora-metadata-extractor deploymentName: fedora-metadata-extractor
kafkaConfigs: prod-kafka-bootstrap-servers kafkaConfigs: prod-kafka-bootstrap-servers
......
...@@ -18,31 +18,41 @@ ...@@ -18,31 +18,41 @@
package org.memobase package org.memobase
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.util.StdDateFormat
import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Property
import org.apache.jena.rdf.model.Resource
import org.apache.jena.rdf.model.impl.SelectorImpl import org.apache.jena.rdf.model.impl.SelectorImpl
import org.apache.jena.riot.Lang import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.FedoraClientImpl
import org.memobase.fedora.RdfResponseTypes import org.memobase.fedora.RdfResponseTypes
import org.memobase.model.EventMessage import org.memobase.model.EventMessage
import org.memobase.rdf.* import ch.memobase.rdf.*
import org.memobase.settings.SettingsLoader import ch.memobase.settings.SettingsLoader
import rdf.MB import org.apache.jena.rdf.model.*
import java.io.StringWriter import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeader
import java.net.URI import java.net.URI
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.*
import java.lang.*
import kotlin.collections.HashMap
class KafkaTopology( class KafkaTopology(
private val settings: SettingsLoader private val settings: SettingsLoader
...@@ -54,47 +64,139 @@ class KafkaTopology( ...@@ -54,47 +64,139 @@ class KafkaTopology(
.properties(settings.appSettings, "fedora") .properties(settings.appSettings, "fedora")
.build() .build()
fun build(): Topology { fun run(): Topology {
val builder = StreamsBuilder() val jsonMapper = ObjectMapper().apply {
registerKotlinModule()
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
setDateFormat(StdDateFormat())
}
val stream = builder.stream<String, String>(settings.inputTopic) lateinit var consumer: Consumer<String, String>
lateinit var producer: Producer<String, String>
try {
consumer = createConsumer()
producer = createProducer()
consumer.subscribe(listOf(settings.inputTopic))
} catch (ex: Exception) {
log.error("Exception during ceating kafka consumer/producer: " + ex.toString())
}
val objectBranches = stream while (true) {
.flatMapValues { value -> parseMessage(value) } try {
.branch( val records = consumer.poll(Duration.ofSeconds(1))
// TODO: Add actual values. records.iterator().forEach {
Predicate { _, value -> value.objectType == "rico:Record" }, var eventJson = it.value()
Predicate { _, value -> value.objectType == "rico:CorporateBody" }, var event = jsonMapper.readValue(eventJson, org.memobase.model.EventMessage::class.java)
Predicate { _, value -> value.objectType == "rico:RecordSet" } var eventRdf: List<Pair<EventMessage, String>> = requestPrimaryResource(event)
) var eventModel: Pair<EventMessage, Model> = parseModel(eventRdf[0])
val valueWithHeader: Triple<EventMessage, Model, HashMap<String, String>> = extractHeaderMetadata(eventModel)
val headerData: HashMap<String, String> = valueWithHeader.third
var model: Model = addEventType(eventModel)
if (headerData["objectType"] == "rico:Record") {
model = requestAdditionalRecordResources(model)
}
model = filterFedoraProperties(model)
val modelString: String = Functions.writeModel(model)
objectBranches[0] var outputTopic = settings.outputTopic + when (event.objectType) {
.flatMapValues { value -> requestPrimaryResource(value) } "rico:Record" -> "-json-records"
.mapValues { value -> parseModel(value) } "rico:RecordSet" -> "-json-record-sets"
.mapValues { value -> addEventType(value) } "rico:CorporateBody" -> "-json-institutions"
.mapValues { value -> requestAdditionalRecordResources(value) } else -> "-XXX"
.mapValues { value -> filterFedoraProperties(value) } }
.mapValues { value -> Functions.writeModel(value) }
.to(settings.outputTopic + "-json-records") var producerRecord: ProducerRecord<String, String> =
ProducerRecord(outputTopic, event.eventId, modelString)
objectBranches[1] producerRecord.headers().add("institutionId", headerData["institutionId"]?.toByteArray())
.flatMapValues { value -> requestPrimaryResource(value) } producerRecord.headers().add("recordSetId", headerData["recordSetId"]?.toByteArray())
.mapValues { value -> parseModel(value) } producerRecord.headers().add("recordId", headerData["recordId"]?.toByteArray())
.mapValues { value -> addEventType(value) } producerRecord.headers().add("isPublished", headerData["isPublished"]?.toByteArray())
.mapValues { value -> filterFedoraProperties(value) }
.mapValues { value -> Functions.writeModel(value) } producer.send(producerRecord)
.to(settings.outputTopic + "-json-institutions") log.debug("FedoraEvent " + event.eventId +
" written to topic '" + outputTopic + "'.")
objectBranches[2] }
.flatMapValues { value -> requestPrimaryResource(value) } } catch (ex: Exception) {
.mapValues { value -> parseModel(value) } log.error("Exception during processing eventMessages: " + ex.toString())
.mapValues { value -> addEventType(value) } }
.mapValues { value -> filterFedoraProperties(value) } }
.mapValues { value -> Functions.writeModel(value) } }
.to(settings.outputTopic + "-json-record-sets")
private fun extractHeaderMetadata(value: Pair<EventMessage, Model>): Triple<EventMessage, Model, HashMap<String, String>> {
val eventMsg = value.first
return builder.build() val rdfModel = value.second
var headerData: HashMap<String, String> = HashMap<String, String>()
try {
val subjectUri = eventMsg.objectPath
var isPublishedProp = MB.isPublished // TODO: use MB.isPublished as soon as http VS https is unified
isPublishedProp = ResourceFactory.createProperty("http://memobase.ch/internal/", "isPublished")
if (eventMsg.objectType == "rico:Record") {
headerData.put(
"recordId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val recordSetId = getPropertyAsString(subjectUri, rdfModel, RICO.isPartOf)
headerData.put(
"recordSetId",
recordSetId.substring(recordSetId.lastIndexOf('/') + 1)
)
val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy)
headerData.put(
"institutionId",
institutionId.substring(institutionId.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
} else if (eventMsg.objectType == "rico:RecordSet") {
headerData.put(
"recordsetId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val institutionId = getPropertyAsString(subjectUri, rdfModel, RICO.heldBy)
headerData.put(
"institutionId",
institutionId.substring(institutionId.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
} else if (eventMsg.objectType == "rico:Institution") {
headerData.put(
"institutionId",
eventMsg.objectPath.substring(subjectUri.lastIndexOf('/') + 1)
)
val recordSetIds = getPropertyAsString(subjectUri, rdfModel, RICO.isHolderOf)
headerData.put(
"recordsetIds",
recordSetIds.substring(eventMsg.objectPath.lastIndexOf('/') + 1)
)
val isPublished = getPropertyAsString(subjectUri, rdfModel, isPublishedProp)
headerData.put("isPublished", isPublished)
}
} catch (ex: Exception) {
log.error("Exception while extracting HeaderData for " + eventMsg.eventId + ": " + ex.message)
}
return Triple(eventMsg, rdfModel, headerData)
}
private fun getResourceFromRdf(model: Model, uri: String): Resource {
return model.getResource(uri)
}
private fun getPropertyAsString(subjectUri: String, model: Model, property: Property): String {
val subject = model.getResource(subjectUri)
val objects = subject.listProperties(property).mapWith {
val o = it.`object`
when {
o.isURIResource -> o.asResource().uri
o.isLiteral -> o.asLiteral().value.toString()
else -> o.toString()
}
}.toList()
return when {
objects.size <= 0 -> throw Exception("No property ${property.toString()} found.")
objects.size > 1 -> throw Exception("More than 1 property ${property.toString()} found")
else -> objects[0]
}
} }
private fun parseMessage(data: String): List<EventMessage> { private fun parseMessage(data: String): List<EventMessage> {
...@@ -200,4 +302,35 @@ class KafkaTopology( ...@@ -200,4 +302,35 @@ class KafkaTopology(
} }
return model return model
} }
private fun createConsumer(): Consumer<String, String> {
val props = Properties()
props["bootstrap.servers"] = listOf(settings.kafkaConsumerSettings)[0]["bootstrap.servers"].toString()
props["group.id"] = listOf(settings.kafkaConsumerSettings)[0]["group.id"].toString()
props["client.id"] = listOf(settings.kafkaConsumerSettings)[0]["client.id"].toString()
props["key.deserializer"] = StringDeserializer::class.java
props["value.deserializer"] = StringDeserializer::class.java
props["enable.auto.commit"] = false
props["max.poll.records"] = "10"
props["max.poll.interval.ms"] = "3600000"
props["max.poll.interval.ms"] = "3600000"
props["auto.offset.reset"] = "earliest"
props["allow.auto.create.topics"] = false
lateinit var consumer: org.apache.kafka.clients.consumer.KafkaConsumer<String, String>
try {
consumer = KafkaConsumer<String, String>(props)
} catch (t: Exception) {
consumer = KafkaConsumer<String, String>(props)
}
return consumer
}
private fun createProducer(): Producer<String, String> {
val props = Properties()
props["bootstrap.servers"] = listOf(settings.kafkaProducerSettings)[0]["bootstrap.servers"].toString()
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = StringSerializer::class.java
return KafkaProducer<String, String>(props)
}
} }
...@@ -20,7 +20,7 @@ package org.memobase ...@@ -20,7 +20,7 @@ package org.memobase
import kotlin.system.exitProcess import kotlin.system.exitProcess
import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader import ch.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") { class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("FedoraMetadataExtractionService") private val log = LogManager.getLogger("FedoraMetadataExtractionService")
...@@ -33,21 +33,13 @@ class Service(file: String = "app.yml") { ...@@ -33,21 +33,13 @@ class Service(file: String = "app.yml") {
"fedora.externalBaseUrl" "fedora.externalBaseUrl"
), ),
file, file,
useStreamsConfig = true useStreamsConfig = false,
useConsumerConfig = true,
useProducerConfig