Unverified Commit 48286f37 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

add reporting

parent bd209d8c
Pipeline #14570 passed with stages
in 13 minutes and 2 seconds
......@@ -12,6 +12,9 @@ kafka:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
client.id: ${CLIENT_ID:?system}
group.id: ${GROUP_ID:?system}
producer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
client.id: ${CLIENT_ID:?system}
topic:
in: ${TOPIC_IN:?system}
process: ${TOPIC_PROCESS:?system}
......@@ -21,6 +21,7 @@ package ch.memobase
import java.time.Duration
import ch.memobase.models._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging
import org.memobase.settings.SettingsLoader
......@@ -40,7 +41,7 @@ object App extends scala.App with Logging {
"fedoraPassword"
).asJava,
"app.yml",
false,
true,
false,
true,
false)
......@@ -57,6 +58,7 @@ object App extends scala.App with Logging {
settings.getAppSettings.getProperty("fedoraPassword")
)
val recordProcessor = new RecordProcessor(fileHandler, fCWrapper, settings.getAppSettings.getProperty("externalBaseUrl"))
val reporter = Reporter(settings.getKafkaProducerSettings, settings.getProcessReportTopic)
val consumerPollTimeoutMs = 100
try {
......@@ -69,20 +71,31 @@ object App extends scala.App with Logging {
processed <- recordProcessor.process(record)
} yield processed match {
case ProcessSuccess(id, fT, a) if a == "Delete" =>
logger.debug(s"Deleting of object $id with type $fT successful")
val msg = s"Deleting of object $id with type $fT successful"
logger.debug(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
case ProcessSuccess(id, fT, _) =>
logger.debug(s"Copying of object $id with type $fT successful")
val msg = s"Copying of object $id with type $fT successful"
logger.debug(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
case ProcessFailure(id, fT, a, ex) if a == "Delete" =>
logger.error(s"Deleting of object $id with type $fT failed: ${ex.getMessage}")
val msg = s"Deleting of object $id with type $fT failed: ${ex.getMessage}"
logger.error(msg)
logger.info(ex.getStackTrace.mkString("\n"))
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessFailure(id, fT, _, ex) =>
logger.error(s"Copying of object $id with type $fT failed: ${ex.getMessage}")
val msg = s"Copying of object $id with type $fT failed: ${ex.getMessage}"
logger.error(msg)
logger.info(ex.getStackTrace.mkString("\n"))
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessIgnore(id, reason, warn) if warn =>
logger.warn(s"Ignoring object $id: $reason")
val msg = s"Ignoring object $id: $reason"
logger.warn(msg)
reporter.send(Report(id, ProcessingIsFailure, msg))
case ProcessIgnore(id, reason, _) =>
logger.info(s"Ignoring object $id: $reason")
val msg = s"Ignoring object $id: $reason"
logger.info(msg)
reporter.send(Report(id, ProcessingIsSuccess, msg))
}
}
} catch {
......@@ -92,5 +105,6 @@ object App extends scala.App with Logging {
} finally {
logger.info("Shutting down application")
consumer.close()
reporter.close()
}
}
/*
* Media Converter
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import java.util.Properties
import java.util.concurrent.{Future => JavaFuture}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
/**
* Wrapper class for sending reports via Kafka topic
*
* @param props Properties for wrapped KafkaProducer
* @param reportingTopic Name of reporting topic
*/
class Reporter(props: Properties, reportingTopic: String) {
private val producer = new KafkaProducer[String, String](props)
def send(report: models.Report): JavaFuture[RecordMetadata] = {
producer.send(new ProducerRecord(reportingTopic, report.id, report.toString))
}
def close(): Unit = producer.close()
}
object Reporter {
def apply(props: Properties, reportingTopic: String): Reporter = new Reporter(props, reportingTopic)
}
/*
* Media Converter
* Extracts media files from Fedora repository
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase.models
sealed trait ProcessingStatus
case object ProcessingIsSuccess extends ProcessingStatus
case object ProcessingIsFailure extends ProcessingStatus
case class Report(id: String, status: ProcessingStatus, message: String) {
override def toString: String =
s"""{"id": "$id", "status": "${
status match {
case ProcessingIsSuccess => "SUCCESS"
case ProcessingIsFailure => "FAILURE"
}
}", "message": "$message"}"""
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment