Unverified Commit 89f7b9db authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

first commit

parents
.bloop/
.idea/
.metals/
target/
stages:
- test
- build
- publish
variables:
DOCKER_TLS_CERTDIR: ""
include:
- project: 'memoriav/memobase-2020/utilities/ci-templates'
ref: modular
file: 'sbt/default.yml'
- project: 'memoriav/memobase-2020/utilities/ci-templates'
ref: modular
file: 'docker/default.yml'
FROM openjdk:8-jre-slim-buster
ADD target/scala-2.12/app.jar /app/app.jar
CMD java -jar /app/app.jar
This diff is collapsed.
# Import Process Delete
Creates deletion messages for Fedora Ingester based on timestamp, session id,
collection id, institution id, or record id.
## Usage
```
--from <yyyy-mm-dd
--session-id
import Dependencies._
ThisBuild / scalaVersion := "2.12.11"
ThisBuild / organization := "ch.memobase"
ThisBuild / organizationName := "Memoriav"
ThisBuild / git.gitTagToVersionNumber := { tag: String =>
if (tag matches "[0-9]+\\..*") Some(tag)
else None
}
lazy val root = (project in file("."))
.enablePlugins(GitVersioning)
.settings(
name := "Import Process Delete",
assemblyJarName in assembly := "app.jar",
test in assembly := {},
assemblyMergeStrategy in assembly := {
case "log4j.properties" => MergeStrategy.first
case "log4j2.xml" => MergeStrategy.first
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
},
mainClass in assembly := Some("ch.memobase.App"),
libraryDependencies ++= Seq(
kafkaClients,
log4jApi,
log4jCore,
log4jSlf4j,
log4jScala,
scopt,
upickle,
scalaTest % Test,
scalatic % Test)
)
apiVersion: batch/v1
kind: Job
metadata:
name: import-process-delete
namespace: memobase
labels:
app: import-process-delete-app
spec:
selector:
matchLabels:
app: import-process-delete-app
template:
metadata:
labels:
app: import-process-delete-app
tier: web
spec:
containers:
- name: import-process-administrator-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/utilities/import-process-delete:latest
imagePullPolicy: Always
env:
- name: JOB_ID
value: import-process-delete
- name: KAFKA_BOOTSTRAP_SERVERS
value: mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
- name: CLIENT_ID
value: import-process-delete
- name: TOPIC_IN
value: import-process-reporting
- name: TOPIC_OUT
value: import-process-test
- name: POLL_TIMEOUT
value: "60000"
restartPolicy: Never
backoffLimit: 1
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
import sbt._
object Dependencies {
lazy val kafkaV = "2.3.1"
lazy val log4jV = "2.11.2"
lazy val scalatestV = "3.1.2"
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaV
lazy val kafkaStreamsTestUtils = "org.apache.kafka" % "kafka-streams-test-utils" % kafkaV
lazy val log4jApi = "org.apache.logging.log4j" % "log4j-api" % log4jV
lazy val log4jCore = "org.apache.logging.log4j" % "log4j-core" % log4jV
lazy val log4jScala = "org.apache.logging.log4j" %% "log4j-api-scala" % "11.0"
lazy val log4jSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jV
lazy val scalatic = "org.scalactic" %% "scalactic" % scalatestV
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalatestV
lazy val scopt = "com.github.scopt" %% "scopt" % "4.0.0-RC2"
lazy val upickle = "com.lihaoyi" %% "upickle" % "0.9.5"
}
#
# import-process-administrator
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# suppress inspection "UnusedProperty"
sbt.version=1.3.10
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ import-process-administrator
~ Copyright (C) 2020 Memoriav
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License as
~ published by the Free Software Foundation, either version 3 of the
~ License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <https://www.gnu.org/licenses/>.
~
-->
<Configuration status="info" name="import-process-administrator" packages="">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5p] [%d{yyyy-MM-dd HH:mm:ss}] [%c{1.}:%L] %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
/*
* import-process-administrator
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase
import ch.memobase.models.DeleteMessage
import org.apache.logging.log4j.scala.Logging
import scala.collection.immutable.HashSet
object App
extends KProducer
with scala.App
with ArgParser
with AppSettings
with KConsumer
with MsgFilter
with Logging {
parse(args) match {
case Some((sessionId, filters)) =>
try {
Stream
.continually(poll)
.takeWhile(records => records.nonEmpty)
.flatMap(records =>
records
.filter(record => filters.forall(f => f(record)))
.foldLeft(HashSet[DeleteMessage]())((agg, record) => agg + DeleteMessage(record, sessionId))
)
.foreach(id => sendDelete(id))
} catch {
case e: Exception =>
logger.error(e)
sys.exit(1)
} finally {
logger.info("Shutting down application")
closeConsumer()
closeProducer()
}
case None =>
logger.error("Invalid command-line arguments!")
sys.exit(1)
}
}
package ch.memobase
import java.util.Properties
import scala.util.Random
trait AppSettings {
val clientId = s"${sys.env("CLIENT_ID")}-${new Random().alphanumeric.take(5).mkString}"
val inputTopic: String = sys.env("TOPIC_IN")
val pollTimeout: Int = sys.env("POLL_TIMEOUT").toInt
val producerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", sys.env("KAFKA_BOOTSTRAP_SERVERS"))
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("batch.size", "131072")
props.put("compression.type", "gzip")
props
}
val consumerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", sys.env("KAFKA_BOOTSTRAP_SERVERS"))
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("fetch.min.bytes", "1048576")
props.put("group.id", clientId)
props.put("allow.auto.create.topics", "false")
props.put("auto.offset.reset", "earliest")
props.put("max.poll.records", "5000")
props
}
val reportingTopic: String = sys.env("TOPIC_OUT")
}
package ch.memobase
import java.util.Calendar
import ch.memobase.models._
import scopt.{OParser, OParserBuilder}
trait ArgParser {
self: MsgFilter =>
private val builder: OParserBuilder[Args] = OParser.builder[Args]
private val parser: OParser[Unit, Args] = {
import builder._
OParser.sequence(
programName("import-process-delete"),
head("", "0.1.0"),
opt[String]('c', "record-set-filter")
.action((v, c) => c.copy(recordSetFilters = c.recordSetFilters :+ v))
.valueName("<id>")
.text("record set id filter")
.optional(),
opt[String]('i', "institution-filter")
.action((v, c) => c.copy(institutionFilters = c.institutionFilters :+ v))
.valueName("<id>")
.text("institution id filter")
.optional(),
opt[String]('r', "record-filter")
.action((v, c) => c.copy(recordFilters = c.recordFilters :+ v))
.valueName("<id>")
.text("record id filter")
.optional(),
opt[String]('s', "session-filter")
.action((v, c) => c.copy(sessionFilters = c.sessionFilters :+ v))
.valueName("<id>")
.text("session id filter")
.optional(),
opt[Calendar]('a', "created-after")
.action((v, c) => c.copy(createdAfterFilter = v))
.valueName("<datetime>")
.text("retains only records processed after timestamp")
.maxOccurs(1)
.optional(),
opt[Calendar]('b', "created-before")
.action((v, c) => c.copy(createdBeforeFilter = v))
.valueName("<datetime>")
.text("retains only records processed before timestamp")
.maxOccurs(1)
.optional(),
opt[String]("<session-id>")
.action((v, c) => c.copy(sessionId = v))
.text("session id assigned to delete message")
.required(),
help("help").text("prints this text")
)
}
def parse(args: Array[String]): Option[(String, Seq[FilterFun])] = {
OParser.parse(parser, args, Args()) match {
case Some(config) => Some(
config.sessionId,
Seq(createdAfterFilter(standardiseTimestamp(config.createdAfterFilter))) ++
Seq(createdBeforeFilter(standardiseTimestamp(config.createdBeforeFilter))) ++
config.institutionFilters.map(v => institutionIdFilter(v)) ++
config.recordFilters.map(v => createRecordIdFilter(v)) ++
config.recordSetFilters.map(v => createRecordIdFilter(v)) ++
config.sessionFilters.map(v => sessionIdFilter(v)))
case None => None
}
}
private def standardiseTimestamp(calendar: Calendar): String =
f"${calendar.get(Calendar.YEAR)}%04d-" +
f"${calendar.get(Calendar.MONTH)}%02d-" +
f"${calendar.get(Calendar.DAY_OF_MONTH)}%02dT" +
f"${calendar.get(Calendar.HOUR_OF_DAY)}%02d:" +
f"${calendar.get(Calendar.MINUTE)}%02d:" +
f"${calendar.get(Calendar.SECOND)}%02d." +
f"${calendar.get(Calendar.MILLISECOND)}%03d"
}
package ch.memobase
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.logging.log4j.scala.Logging
import scala.collection.JavaConverters._
trait KConsumer { self: Logging with AppSettings =>
lazy private val consumer = {
val consumer = new KafkaConsumer[String, String](consumerProps)
logger.debug(s"Subscribing to topic $inputTopic")
consumer.subscribe(List(inputTopic).asJava)
consumer
}
def poll: Iterable[ConsumerRecord[String, String]] = {
val records = consumer.poll(Duration.ofMillis(pollTimeout)).asScala
logger.debug(s"${records.size} new records fetched from Kafka topic $inputTopic")
records
}
def closeConsumer(): Unit = {
logger.debug("Closing Kafka consumer instance")
consumer.close()
}
}
package ch.memobase
import ch.memobase.models.DeleteMessage
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.logging.log4j.scala.Logging
abstract class KProducer {
self: AppSettings with Logging =>
private lazy val producer = new KafkaProducer[String, String](producerProps)
def sendDelete(id: DeleteMessage): Unit = {
logger.info(s"Sending delete command for $id")
val producerRecord = new ProducerRecord[String, String](id.recordId, null)
producerRecord.headers()
.add("recordSetId", id.recordSetId.getBytes())
.add("institutionId", id.institutionId.getBytes())
.add("sessionId", id.sessionId.getBytes())
producer.send(producerRecord)
}
def closeProducer(): Unit = {
logger.debug("Closing Kafka producer instance")
producer.close()
}
}
package ch.memobase
import java.text.SimpleDateFormat
import ch.memobase.models.Report
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.collection.JavaConverters._
trait MsgFilter {
private val dateFormatter = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss.SSS")
type FilterFun = ConsumerRecord[String, String] => Boolean
val sessionIdFilter: String => FilterFun =
sessionId => rec =>
rec.headers()
.headers("sessionId")
.asScala
.map(header => new String(header.value()))
.exists(v => v == sessionId)
val recordSetIdFilter: String => FilterFun =
recordSetId => rec =>
rec.headers()
.headers("recordSetId")
.asScala
.map(header => new String(header.value()))
.exists(v => v == recordSetId)
val institutionIdFilter: String => FilterFun =
institutionId => rec =>
rec.headers()
.headers("institutionId")
.asScala
.map(header => new String(header.value()))
.exists(v => v == institutionId)
val createRecordIdFilter: String => FilterFun =
recordId => rec =>
Report(rec.value()).id == recordId
val createdAfterFilter: String => FilterFun =
timestamp => rec => {
val recordTimestamp = Report(rec.value()).timestamp
recordTimestamp == timestamp ||
dateFormatter.parse(timestamp).after(dateFormatter.parse(recordTimestamp))
}
val createdBeforeFilter: String => FilterFun =
timestamp => rec => {
val recordTimestamp = Report(rec.value()).timestamp
recordTimestamp == timestamp ||
dateFormatter.parse(timestamp).before(dateFormatter.parse(recordTimestamp))
}
}
package ch.memobase.models
import java.util.Calendar
case class Args(sessionId: String = "",
recordFilters: Seq[String] = Seq(),
recordSetFilters: Seq[String] = Seq(),
institutionFilters: Seq[String] = Seq(),
sessionFilters: Seq[String] = Seq(),
createdAfterFilter: Calendar = {
val cal = Calendar.getInstance()
cal.setTimeInMillis(0)
cal
},
createdBeforeFilter: Calendar = Calendar.getInstance())
package ch.memobase.models
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.collection.JavaConverters._
case class DeleteMessage(recordId: String, recordSetId: String, institutionId: String, sessionId: String)
object DeleteMessage {
def apply(consumerRecord: ConsumerRecord[String, String], sessionId: String): DeleteMessage = {
val recordSetId = new String(consumerRecord.headers().headers("recordSetId").asScala.head.value())
val institutionId = new String(consumerRecord.headers().headers("institutionId").asScala.head.value())
DeleteMessage(consumerRecord.key(), recordSetId, institutionId, sessionId)
}
}
package ch.memobase.models
import upickle.default.{Reader, _}
case class Report(id: String,
step: String,
timestamp: String,
status: String,
message: String)
object Report {
def apply(json: String): Report = {
read[Report](json)
}
implicit val reader: Reader[Report] = macroR
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment