/* * Media Converter * Extracts media files from Fedora repository * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase import ch.memobase.models._ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.logging.log4j.scala.Logging import org.memobase.settings.SettingsLoader import java.time.Duration import scala.collection.JavaConverters._ object App extends scala.App with Logging with RecordUtils { val settings = new SettingsLoader(List( "audioSnippetDuration", "internalBaseUrl", "externalBaseUrl", "fedoraUser", "fedoraPassword", "mediaFolderRootPath" ).asJava, "app.yml", true, false, true, false) val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings) val fileHandler = new DisseminationCopyHandler( settings.getAppSettings.getProperty("audioSnippetDuration").toInt) val fCWrapper = FedoraClientWrapper( settings.getAppSettings.getProperty("internalBaseUrl"), settings.getAppSettings.getProperty("externalBaseUrl"), settings.getAppSettings.getProperty("fedoraUser"), settings.getAppSettings.getProperty("fedoraPassword") ) val recordProcessor = new RecordProcessor(fileHandler, fCWrapper, settings.getAppSettings) val reporter = Reporter(settings.getKafkaProducerSettings, settings.getProcessReportTopic) val consumerPollTimeoutMs = 100 try { logger.debug(s"Subscribing to topic ${settings.getInputTopic}") consumer.subscribe(List(settings.getInputTopic).asJava) while (true) { val records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs)).asScala records.foreach { record => { val headers = record.headers() val recordSetHeader = headers.lastHeader("recordSetId") val institutionHeader = headers.lastHeader("institutionId") val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match { case ProcessSuccess(id, resource, msg) => logger.info(s"$id: $msg") ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingSuccess) case ProcessIgnore(id, resource, msg) => logger.info(s"$id: $msg") ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingIgnore) case ProcessWarning(id, resource, msg) => logger.warn(s"$id: $msg") ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingWarning) case ProcessFatal(id, resource, msg, ex) => logger.error(s"$id: $msg: ${ex.getMessage}") logger.debug(ex.getStackTrace.mkString("\n")) ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal) }) reporter.send(reportingObject, recordSetHeader, institutionHeader) } } } } catch { case e: Exception => logger.error(e) sys.exit(1) } finally { logger.info("Shutting down application") consumer.close() reporter.close() } }