Commit 1e8b6552 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

Merge branch 'MEMO-774-adapt-to-new-import-workflow' into 'master'

MEMO-774: adapt to new import workflow

See merge request !9
parents b58ad288 28f867b8
Pipeline #15543 passed with stages
in 9 minutes and 42 seconds
......@@ -45,7 +45,7 @@ dependencies {
implementation 'org.apache.jena:apache-jena:3.14.0', excludeSlf4J
implementation 'org.memobase:memobase-service-utilities:1.10.0', excludeSlf4J
implementation 'org.memobase:memobase-service-utilities:0.16.0', excludeSlf4J
implementation 'org.memobase:fedora-client:0.6.2', excludeSlf4J
// KOTLIN IMPORTS
......
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-app-config"
name: "{{ .Values.deploymentName }}-config"
namespace: memobase
data:
APP_DIRECTORY: "{{ .Values.appDirectory }}"
CLIENT_ID: "{{ .Values.processId }}-{{ .Values.jobName }}"
TOPIC_IN: "{{ .Values.processId }}-{{ .Values.lastJobName }}"
TOPIC_OUT: "{{.Values.processId }}-{{ .Values.jobName }}"
TOPIC_PROCESS: "{{ .Values.processId }}-reporting"
APPLICATION_ID: {{ .Values.applicationId }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
apiVersion: batch/v1
kind: Job
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Values.processId }}-{{ .Values.jobName }}"
name: {{ .Values.deploymentName }}
namespace: memobase
labels:
institutionId: "{{ .Values.institutionId }}"
recordSetId: "{{ .Values.recordSetId }}"
jobType: "import-job"
jobType: "import-process-deployment"
spec:
selector:
matchLabels:
app: {{ .Values.deploymentName }}
replicas: 1
template:
metadata:
labels:
app: {{ .Values.deploymentName }}
tier: import-process
spec:
restartPolicy: Always
containers:
- name: "{{ .Values.processId }}-{{ .Values.jobName }}"
- name: "{{ .Values.deploymentName }}-container"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
imagePullPolicy: Always
resources:
requests:
cpu: "0.5"
memory: "128Mi"
limits:
cpu: "1"
memory: "1Gi"
envFrom:
- secretRef:
name: "{{ .Values.sftpConfigs }}"
......@@ -21,6 +36,4 @@ spec:
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.processId }}-{{ .Values.jobName }}-app-config"
restartPolicy: Never
backoffLimit: 0
\ No newline at end of file
name: "{{ .Values.deploymentName }}-config"
......@@ -3,9 +3,10 @@ registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/fedora-ingest-service"
tag: "latest"
jobName: fedora-ingest-service
lastJobName: normalization-service
processId: p0001
deploymentName: fedora-ingest-service
## TODO: This needs to be solved differently. This way it is not possible to deploy a replica-set.
## somehow the id needs to be dependent on the pod name?
applicationId: fedora-ingest-service-app
institutionId: placeholder
recordSetId: placeholder
......@@ -17,4 +18,7 @@ fedoraConfigs: fedora-admin-credentials
## Needs to be set to the directory on the sftp server.
## this is a relative path built like this:
## "./{RECORD_SET_ID}"
appDirectory: placeholderValue
\ No newline at end of file
appDirectory: placeholderValue
inputTopicName: import-process-normalization
reportingTopicName: import-process-reporting
......@@ -30,6 +30,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager
......@@ -84,7 +86,7 @@ class ServiceTest {
)
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
reportConsumer = KafkaConsumer(consumerProps)
reportConsumer.subscribe(listOf("test-ingest-out-reporting"))
reportConsumer.subscribe(listOf("test-ingest-reporting"))
}
// TODO: Test works with local docker
......@@ -98,8 +100,10 @@ class ServiceTest {
inputProducer.send(
ProducerRecord(
"test-ingest-in",
null,
"https://memobase.ch/record/$id",
inputFile.readText()
inputFile.readText(),
params.headers
)
)
}
......@@ -125,6 +129,12 @@ class ServiceTest {
private fun kafkaTests() = Stream.of(
TestParams(
1,
listOf(
RecordHeader("recordSetId", "AFZ-IB_Becker_Audiovisuals".toByteArray()),
RecordHeader("sessionId", "long-session-id-1234567890".toByteArray()),
RecordHeader("shortSessionId", "1234567890".toByteArray()),
RecordHeader("institutionId", "AFZ".toByteArray())
),
listOf(
"AFZ-IB_Becker_Audiovisuals_63.nt"
),
......
......@@ -18,8 +18,11 @@
package org.memobase
import org.apache.kafka.common.header.internals.RecordHeader
data class TestParams(
val count: Int,
val headers: List<RecordHeader>,
val inputFileNames: List<String>,
val expectedIngestReports: List<Report>
)
......@@ -21,13 +21,16 @@ import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
class Producer(props: Properties, topic: String) : Closeable {
class Producer(
props: Properties,
private val reportingTopic: String
) : Closeable {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
fun sendReport(report: Report) {
instance.send(ProducerRecord(reportingTopic, report.id, report.toJson()))
fun sendReport(headers: Headers, report: Report) {
instance.send(ProducerRecord(reportingTopic, null, report.id, report.toJson(), headers))
}
override fun close() {
......
......@@ -64,7 +64,7 @@ class Service(fileName: String = "app.yml") : Closeable {
consumerSettings.setProperty("max.poll.records", CONSUMER_MAX_POLL_RECORDS)
consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
consumer = Consumer(consumerSettings, settings.inputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
log.info("Connected to Kafka.")
}
......@@ -77,7 +77,7 @@ class Service(fileName: String = "app.yml") : Closeable {
fun processRecords() {
for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
producer.sendReport(record.headers(), ingestReport)
}
}
......
......@@ -21,4 +21,4 @@ kafka:
topic:
in: ${TOPIC_IN:?env}
out: ${TOPIC_OUT:?env}
process: ${TOPIC_PROCESS:?env}
\ No newline at end of file
process: ${TOPIC_REPORTING:?env}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment