RecordProcessor.scala 3.05 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
23
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.util.{Failure, Success, Try}

trait ProcessOutcome

case class ProcessSuccess(id: String, fileType: MediaFileType, action: String) extends ProcessOutcome

case class ProcessFailure(id: String, fileType: MediaFileType, action: String, ex: Throwable) extends ProcessOutcome

case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
37
class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
38
39
40
41
42
43
44

  private def errorHandler(fun: Try[_], objId: String, fileType: MediaFileType, action: String): ProcessOutcome = fun match {
    case Success(_) => ProcessSuccess(objId, fileType, action)
    case Failure(ex) => ProcessFailure(objId, fileType, action, ex)
  }

  def process(record: ConsumerRecord[String, String]): ProcessOutcome = {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
45
46
47
48
49
    (for {
      kafkaMsg <- BinaryResourceMetadata.build(record.value(), externalBaseUrl)
      fileWithMetadata <- fedoraClientWrapper.fetchBinaryResource(kafkaMsg.filePath)
    } yield (kafkaMsg.id, kafkaMsg.eventType, fileWithMetadata.fileType, fileWithMetadata.data) match {
      case (id, Create | Update, af: AudioFileType, data) =>
50
        errorHandler(fileHandler.createAudioCopy(data, id, af), id, af, "Create/Update")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
51
52
53
      case (id, Delete, af: AudioFileType, _) =>
        errorHandler(fileHandler.deleteAudioCopy(id), id, af, "Delete")
      case (id, Create | Update, vf: VideoFileType, data) =>
54
        errorHandler(fileHandler.createVideoCopy(data, id, vf), id, vf, "Create/Update")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
55
      case (id, Delete, vf: VideoFileType, _) =>
56
        errorHandler(fileHandler.deleteVideoCopy(id, vf), id, vf, "Delete")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
57
      case (id, Create | Update, i: ImageFileType, data) =>
58
        errorHandler(fileHandler.createImageCopy(data, id, i), id, i, "Create/Update")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
59
      case (id, Delete, i: ImageFileType, _) =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
60
        errorHandler(fileHandler.deleteImageCopy(id), id, i, "Delete")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
61
62
63
      case (id, event, ft, _) =>
        ProcessFailure(id, ft, event.toString, new Exception)
    }).recover {
64
      case e: ResourceWithoutBinary => ProcessIgnore(record.key(), e.getMessage)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
65
      case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
66
    }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
67
  }.get
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
68
}