App.scala 3.76 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
22
import ch.memobase.models._
23
24
25
26
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.logging.log4j.scala.Logging
import org.memobase.settings.SettingsLoader

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
27
import java.time.Duration
28
29
30
import scala.collection.JavaConverters._


31
object App extends scala.App with Logging with RecordUtils {
32
  val settings = new SettingsLoader(List(
33
    "audioSnippetDuration",
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34
35
36
    "internalBaseUrl",
    "externalBaseUrl",
    "fedoraUser",
37
38
    "fedoraPassword",
    "mediaFolderRootPath"
39
40
  ).asJava,
    "app.yml",
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
41
    true,
42
43
44
45
    false,
    true,
    false)
  val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
46
  val fileHandler = new DisseminationCopyHandler(
47
    settings.getAppSettings.getProperty("audioSnippetDuration").toInt)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
48
  val fCWrapper = FedoraClientWrapper(
49
    settings.getAppSettings.getProperty("internalBaseUrl"),
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
50
51
52
53
    settings.getAppSettings.getProperty("externalBaseUrl"),
    settings.getAppSettings.getProperty("fedoraUser"),
    settings.getAppSettings.getProperty("fedoraPassword")
  )
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
54
  val recordProcessor = new RecordProcessor(fileHandler, fCWrapper, settings.getAppSettings)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
55
  val reporter = Reporter(settings.getKafkaProducerSettings, settings.getProcessReportTopic)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
56
  val consumerPollTimeoutMs = 100
57
58
59
60
61

  try {
    logger.debug(s"Subscribing to topic ${settings.getInputTopic}")
    consumer.subscribe(List(settings.getInputTopic).asJava)
    while (true) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
62
      val records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs)).asScala
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63
64
      records.foreach {
        record => {
65
66
67
          val headers = record.headers()
          val recordSetHeader = headers.lastHeader("recordSetId")
          val institutionHeader = headers.lastHeader("institutionId")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
          val reportingObject = recordProcessor.process(record).foldLeft(ReportingObject(record.key()))((reportingObject, outcome) => outcome match {
            case ProcessSuccess(id, resource, msg) =>
              logger.info(s"$id: $msg")
              ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingSuccess)
            case ProcessIgnore(id, resource, msg) =>
              logger.info(s"$id: $msg")
              ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingIgnore)
            case ProcessWarning(id, resource, msg) =>
              logger.warn(s"$id: $msg")
              ReportingObject.addResourceOutcome(reportingObject, (resource, msg), ProcessingWarning)
            case ProcessFatal(id, resource, msg, ex) =>
              logger.error(s"$id: $msg: ${ex.getMessage}")
              logger.debug(ex.getStackTrace.mkString("\n"))
              ReportingObject.addResourceOutcome(reportingObject, (resource, s"$msg: ${ex.getMessage}"), ProcessingFatal)
          })
83
          reporter.send(reportingObject, recordSetHeader, institutionHeader)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
84
        }
85
86
87
      }
    }
  } catch {
88
89
90
    case e: Exception =>
      logger.error(e)
      sys.exit(1)
91
92
93
  } finally {
    logger.info("Shutting down application")
    consumer.close()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
94
    reporter.close()
95
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
96
}
97