Unverified Commit 765ec7aa authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

first commit

parents
Pipeline #13056 passed with stages
in 7 minutes and 59 seconds
.bloop/
.idea/
.metals/
target/
stages:
- test
- build
- publish
variables:
DOCKER_TLS_CERTDIR: ""
include:
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'sbt-build/sbt-build.yml'
- project: 'memoriav/memobase-2020/utilities/ci-templates'
file: 'docker-image/docker-image.yml'
FROM openjdk:8-jre-slim-buster
ADD target/scala-2.12/app.jar /app/app.jar
ADD src/main/resources/app.yml /configs/app.yml
CMD java -jar /app/app.jar
This diff is collapsed.
# Import Process Administrator
Manages import process Kafka services
import Dependencies._
ThisBuild / scalaVersion := "2.12.11"
ThisBuild / organization := "ch.memobase"
ThisBuild / organizationName := "Memoriav"
ThisBuild / git.gitTagToVersionNumber := { tag: String =>
if (tag matches "[0-9]+\\..*") Some(tag)
else None
}
lazy val root = (project in file("."))
.enablePlugins(GitVersioning)
.settings(
name := "import-process-administrator",
scalacOptions += "-Ypartial-unification",
assemblyJarName in assembly := "app.jar",
test in assembly := {},
assemblyMergeStrategy in assembly := {
case "log4j.properties" => MergeStrategy.first
case "log4j2.xml" => MergeStrategy.first
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
},
mainClass in assembly := Some("ch.memobase.App"),
resolvers ++= Seq(
"Memobase Utils" at "https://dl.bintray.com/memoriav/memobase"
),
libraryDependencies ++= Seq(
kafkaStreams,
log4jApi,
log4jCore,
log4jSlf4j,
log4jScala,
memobaseServiceUtils,
uPickle,
kafkaStreamsTestUtils % Test,
scalaTest % Test,
scalatic % Test)
)
apiVersion: apps/v1
kind: Deployment
metadata:
name: import-process-administrator
namespace: memobase
labels:
app: import-process-administrator-app
spec:
selector:
matchLabels:
app: import-process-administrator-app
replicas: 1
template:
metadata:
labels:
app: import-process-administrator-app
tier: web
spec:
containers:
- name: import-process-administrator-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/import-process-administrator:latest
imagePullPolicy: Always
env:
- name: JOB_ID
value: import-process-administrator
- name: KAFKA_BOOTSTRAP_SERVERS
value: mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
- name: CLIENT_ID
value: import-process-administrator
- name: REPORTING_TOPIC_IN
value: import-process-reporting-*
- name: COUNT_TOPIC_IN
value: import-dataset-sizes
- name: TOPIC_OUT
value: import-process-admin
- name: TOPIC_PROCESS
value: none
restartPolicy: Always
volumes:
- name: config
configMap:
name: import-process-administrator-config
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
import sbt._
object Dependencies {
lazy val kafkaV = "2.3.1"
lazy val log4jV = "2.11.2"
lazy val scalatestV = "3.1.2"
lazy val kafkaStreams = "org.apache.kafka" %% "kafka-streams-scala" % kafkaV
lazy val kafkaStreamsTestUtils = "org.apache.kafka" % "kafka-streams-test-utils" % kafkaV
lazy val log4jApi = "org.apache.logging.log4j" % "log4j-api" % log4jV
lazy val log4jCore = "org.apache.logging.log4j" % "log4j-core" % log4jV
lazy val log4jScala = "org.apache.logging.log4j" %% "log4j-api-scala" % "11.0"
lazy val log4jSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jV
lazy val memobaseServiceUtils = "org.memobase" % "memobase-service-utilities" % "1.4.1"
lazy val scalatic = "org.scalactic" %% "scalactic" % scalatestV
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalatestV
lazy val uPickle = "com.lihaoyi" %% "upickle" % "0.9.5"
}
#
# import-process-administrator
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# suppress inspection "UnusedProperty"
sbt.version=1.3.10
// DO NOT EDIT! This file is auto-generated.
// This file enables sbt-bloop to create bloop config files.
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.3")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
app:
countInputTopic: ${COUNT_INPUT_TOPIC:?system}
kafka:
streams:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
application.id: ${APPLICATION_ID:?system}
topic:
in: ${REPORTING_TOPIC_IN:?system}
out: ${TOPIC_OUT:?system}
process: ${TOPIC_PROCESS:?system}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ import-process-administrator
~ Copyright (C) 2020 Memoriav
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License as
~ published by the Free Software Foundation, either version 3 of the
~ License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
~
-->
<Configuration status="info" name="import-process-administrator" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5p] [%d{yyyy-MM-dd HH:mm:ss}] [%c{1.}:%L] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import java.time.Duration
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.scala.Logging
import scala.util.{Failure, Success, Try}
object App extends scala.App with Logging {
val topology = new KafkaTopology
val streams = new KafkaStreams(topology.build(), SettingsFromFile.getKafkaStreamsSettings)
logger.trace("Starting stream processing")
Try(
streams.start()
) match {
case Success(_) => logger.info("Workflow successful. Finishing...")
case Failure(f) => logger.error(s"Aborting due to errors: ${f.getMessage}")
}
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
}
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import java.util.regex.Pattern
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.{StreamsBuilder, _}
import org.apache.logging.log4j.scala.Logging
import scala.util.{Success, Try}
class KafkaTopology extends Logging {
import Serdes._
def build(): Topology = {
val intermediaryDataSetCountTopic = "import-process-filtered-dataset-count"
val intermediaryReportingTopic = "import-process-ordered-service-reports"
val builder = new StreamsBuilder
val topicPattern = Pattern.compile(SettingsFromFile.getReportingInputTopic)
val reportSource = builder.stream[String, String](topicPattern)
val orderedReportSource = builder.stream[String, Int](intermediaryReportingTopic)
val dataSetSizeRegistry = builder.stream[String, String](SettingsFromFile.getCountInputTopic)
dataSetSizeRegistry.flatMap((k, v) => Try(v.toLong) match {
case Success(l) => List((k, l))
case _ =>
logger.warn("Ignoring message since count is no valid value")
List()
})
.to(intermediaryDataSetCountTopic)
val aggregatedReports = builder
.globalTable(intermediaryDataSetCountTopic,
Materialized.as[String, Long, ByteArrayKeyValueStore]("reporting-counts-store")
.withKeySerde(Serdes.String)
.withValueSerde(Serdes.Long)
)
reportSource
.transform(() => new ReportTransformer)
.to(intermediaryReportingTopic)
orderedReportSource
.groupByKey
.count
.toStream
.join(aggregatedReports)((k, _) => k, (vl, vr) => vl >= vr)
.filter((_, v) => v)
.map((k, _) => (k, ServiceTermination(getProcessId(k), getJobId(k)).toString))
.to(SettingsFromFile.getKafkaOutputTopic)
builder.build()
}
private def getProcessId(s: String) = s.split("-")(0)
private def getJobId(s: String) = s.split("-")(1)
}
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import upickle.default.{Reader, macroRW}
/**
* Terminates a Kafka service when sent downstream
* @param processId Id of import process
* @param jobName Name of job inside process
*/
case class ServiceTermination(processId: String, jobName: String) {
override def toString: String = s"""{"action":"termination","processId":"$processId","job-name":"$jobName"}"""
}
// FIXME: Can eventually be removed
/**
*
* @param id Record id
* @param status Processing result; either SUCCESS or FAILURE
* @param message Message in case of error
*/
case class ServiceReport(id: String, status: String, message: String)
object ServiceReport {
implicit val reader: Reader[ServiceReport] = macroRW
def apply(msg: String): ServiceReport = {
val json = upickle.default.read[ServiceReport](msg)
new ServiceReport(json.id, json.status, json.message)
}
}
/**
* Declares the size of a data set to be expected by a job in an import process
* @param processId Id of import process
* @param jobName Name of job inside process
* @param dataSetSize Size of data set
*/
case class DataSetSize(processId: String, jobName: String, dataSetSize: Long)
object DataSetSize {
implicit val reader: Reader[DataSetSize] = macroRW
def apply(msg: String): DataSetSize = {
val json = upickle.default.read[DataSetSize](msg)
new DataSetSize(json.processId, json.jobName, json.dataSetSize)
}
}
\ No newline at end of file
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
class ReportTransformer extends Transformer[String, String, KeyValue[String, Int]] {
private var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit =
this.context = context
override def transform(key: String, value: String): KeyValue[String, Int] =
new KeyValue[String, Int](context.topic().replace("import-process-reporting-", ""), 1)
override def close(): Unit = {}
}
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import java.util.Properties
import org.memobase.settings.SettingsLoader
import scala.collection.JavaConverters._
trait Settings {
def getKafkaStreamsSettings: Properties
def getReportingInputTopic: String
def getCountInputTopic: String
def getKafkaOutputTopic: String
def getKafkaReportTopic: String
def getId: String
}
/**
* Acts as a single point of fetching for the SettingsLoader
*/
object SettingsFromFile extends Settings {
private val settings = new SettingsLoader(List(
"countInputTopic"
).asJava,
"app.yml",
false,
true,
false,
false)
def getKafkaStreamsSettings: Properties = settings.getKafkaStreamsSettings
def getReportingInputTopic: String = settings.getInputTopic
override def getCountInputTopic: String = settings.getAppSettings.getProperty("countInputTopic")
def getKafkaOutputTopic: String = settings.getOutputTopic
def getKafkaReportTopic: String = settings.getProcessReportTopic
def getId: String = "import-process-administrator"
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment