Commit 1af5897f authored by Jonas Waeber's avatar Jonas Waeber
Browse files

implement basic report processing

parent c1cf59be
......@@ -3,10 +3,11 @@ plugins {
id 'distribution'
id 'org.jetbrains.kotlin.jvm' version '1.3.71'
id 'com.gitlab.morality.grit' version '2.0.2'
id 'org.jlleitschuh.gradle.ktlint' version '9.2.1'
}
group 'ch.memobase'
mainClassName = 'ch.memobase.App'
group 'org.memobase'
mainClassName = 'org.memobase.App'
jar {
manifest {
......@@ -30,29 +31,26 @@ ext {
}
dependencies {
// https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.6.1'
implementation 'org.memobase:memobase-service-utilities:0.17.0'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
implementation "org.apache.logging.log4j:log4j-core:${log4jV}"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jV}"
implementation 'org.memobase:memobase-service-utilities:1.4.0'
// Kafka Imports
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
//testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
//implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
implementation "org.apache.kafka:kafka-streams:${kafkaV}"
// JSON Parser
implementation 'com.beust:klaxon:5.2'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
implementation "org.jetbrains.kotlin:kotlin-script-runtime:1.3.71"
implementation "org.jetbrains.kotlin:kotlin-reflect:1.3.71"
// JUNIT
testCompile("org.junit.jupiter:junit-jupiter:5.4.2")
testImplementation 'org.assertj:assertj-core:3.15.0'
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
}
compileKotlin {
......
apiVersion: v2
name: reports-processor
description: Processeces the reports output from the import process.
type: application
version: 0.0.0
appVersion: 0.0.0
maintainers:
- name: Jonas Waeber
email: jonas.waeber@unibas.ch
kafkaConfigs: prod-kafka-bootstrap-servers
deploymentName: ip-reports-processor
applicationId: ip-reports-processor
inputTopicName: import-process-reporting
outputTopicName: import-process-processed-reporting
reportingTopicName: none
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.deploymentName }}-config"
namespace: memobase
data:
APPLICATION_ID: {{ .Values.applicationId }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_PROCESS: {{ .Values.reportingTopicName }}
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Values.deploymentName }}-deployment"
namespace: memobase
labels:
jobType: "import-process-deployment"
spec:
selector:
matchLabels:
app: {{ .Values.deploymentName }}
replicas: 3
template:
metadata:
labels:
app: {{ .Values.deploymentName }}
tier: import-process
spec:
restartPolicy: Always
containers:
- name: "{{ .Values.deploymentName }}-container"
image: "{{ .Values.registry }}/{{ .Values.image }}:{{ .Values.tag }}"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: "{{ .Values.kafkaConfigs }}"
- configMapRef:
name: "{{ .Values.deploymentName }}-config"
\ No newline at end of file
registry: "cr.gitlab.switch.ch"
image: "memoriav/memobase-2020/services/import-process/reports-processor"
tag: "latest"
kafkaConfigs: prod-kafka-bootstrap-servers
deploymentName: ip-reports-processor
applicationId: ip-reports-processor
inputTopicName: import-process-reporting
outputTopicName: import-process-processed-reporting
reportingTopicName: none
\ No newline at end of file
......@@ -16,12 +16,12 @@
*/
package ch.memobase
import kotlin.system.exitProcess
import org.apache.logging.log4j.LogManager
import kotlin.system.exitProcess
class App {
companion object {
private val log = LogManager.getLogger("Consumer")
private val log = LogManager.getLogger("ReportProcessorApp")
@JvmStatic fun main(args: Array<String>) {
try {
Service().run()
......
/*
* Copyright (C) 2019 Project Swissbib <http://swissbib.org>
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
......@@ -14,22 +14,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
import java.util.*
class Consumer(topic: String, properties: Properties) {
private val instance = KafkaConsumer<String, String>(properties)
init {
instance.subscribe(listOf(topic))
}
import settings.HeaderMetadata
fun consume(): ConsumerRecords<String, String> {
return instance.poll(Duration.ofSeconds(10L))
}
}
\ No newline at end of file
class IndexReport(
val report: Report,
val metadata: HeaderMetadata
)
\ No newline at end of file
/*
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import com.beust.klaxon.Klaxon
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.memobase.settings.SettingsLoader
import settings.HeaderExtractionTransformSupplier
import java.io.StringReader
class KafkaTopology(
private val settings: SettingsLoader
) {
private val klaxon = Klaxon()
fun prepare(): StreamsBuilder {
val builder = StreamsBuilder()
builder.stream<String, String>(settings.inputTopic)
.transformValues(HeaderExtractionTransformSupplier<String>())
.mapValues { value -> klaxon.parse<Report>(StringReader(value.first))?.let { IndexReport(it, value.second) } }
.map { key, value -> KeyValue("$key-${value?.report?.step}", klaxon.toJsonString(value)) }
.to(settings.outputTopic)
return builder
}
}
\ No newline at end of file
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import com.beust.klaxon.Klaxon
import java.time.LocalDateTime
data class Report(
val id: String,
val status: String,
val message: String,
val step: String,
val timestamp: String
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as Report
if (id != other.id) return false
if (status != other.status) return false
if (message != other.message) return false
if (step != other.step) return false
return true
}
override fun hashCode(): Int {
var result = id.hashCode()
result = 31 * result + status.hashCode()
result = 31 * result + message.hashCode()
result = 31 * result + step.hashCode()
return result
}
}
......@@ -14,31 +14,29 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("ElasticBulkActionService")
private val log = LogManager.getLogger("ReportProcessorService")
private val settings = SettingsLoader(
listOf(),
file,
useConsumerConfig = true
useStreamsConfig = true
)
private val consumer = Consumer(settings.inputTopic, settings.kafkaConsumerSettings)
fun run() {
while (true) {
val records = consumer.consume()
for (record in records) {
log.error(record.key(), record.value())
val stream = KafkaStreams(KafkaTopology(settings).prepare().build(), settings.kafkaStreamsSettings)
stream.use {
it.start()
while (stream.state().isRunning) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
throw Exception("Stream stopped running!")
}
}
}
\ No newline at end of file
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
topic:
in: ${TOPIC_IN:?system}
out: ${TOPIC_OUT:?system}
process: ${TOPIC_PROCESS:?system}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ baseline-preprocessor
~ Copyright (C) 2019 Project Swissbib <http://swissbib.org>
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License as published by
~ the Free Software Foundation, either version 3 of the License, or
~ (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
-->
<Configuration status="info" name="skeleton-app" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5level] [%c{1}] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
import ch.memobase.Service
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestConsumer {
@Test
fun `test consume reporting topic`() {
Service().run()
}
}
\ No newline at end of file
app:
kafka:
consumer:
bootstrap.servers: mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
client.id: test-consumer
group.id: test-consumer
auto.offset.reset: "earliest"
topic:
in: import-process-reporting
process: output-topic
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment