RecordProcessor.scala 3.4 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
23
24
import java.io.ByteArrayOutputStream

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
25
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.util.{Failure, Success, Try}

trait ProcessOutcome

case class ProcessSuccess(id: String, fileType: MediaFileType, action: String) extends ProcessOutcome

case class ProcessFailure(id: String, fileType: MediaFileType, action: String, ex: Throwable) extends ProcessOutcome

case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
39
class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40
41
42
43
44
45
46

  private def errorHandler(fun: Try[_], objId: String, fileType: MediaFileType, action: String): ProcessOutcome = fun match {
    case Success(_) => ProcessSuccess(objId, fileType, action)
    case Failure(ex) => ProcessFailure(objId, fileType, action, ex)
  }

  def process(record: ConsumerRecord[String, String]): ProcessOutcome = {
47
    (for {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
48
      kafkaMsg <- BinaryResourceMetadata.build(record.value(), externalBaseUrl)
49
50
      data <- fedoraClientWrapper.fetchBinaryResource(kafkaMsg.filePath)
    } yield createProcessResult(kafkaMsg.id, kafkaMsg.eventType, kafkaMsg.fileType, data))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
51
      .recover {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
52
53
        case e: NoLocalBinary => ProcessIgnore(record.key(), e.getMessage)
        case e: NoDigitalObject => ProcessIgnore(record.key(), e.getMessage)
54
        case e: UnmanageableMediaFileType => ProcessIgnore(record.key(), e.getMessage)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
55
56
57
58
59
60
        case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
      }
  }.get

  private def createProcessResult(id: String, fedoraEvent: Event, mediaFileType: MediaFileType, data: ByteArrayOutputStream) =
    (id, fedoraEvent, mediaFileType, data) match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
61
      case (id, Create | Update, af: AudioFileType, data) =>
62
        errorHandler(fileHandler.createAudioCopy(data, id, af), id, af, "Create/Update")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63
64
65
      case (id, Delete, af: AudioFileType, _) =>
        errorHandler(fileHandler.deleteAudioCopy(id), id, af, "Delete")
      case (id, Create | Update, vf: VideoFileType, data) =>
66
        errorHandler(fileHandler.createVideoCopy(data, id, vf), id, vf, "Create/Update")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
67
      case (id, Delete, vf: VideoFileType, _) =>
68
        errorHandler(fileHandler.deleteVideoCopy(id, vf), id, vf, "Delete")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
69
      case (id, Create | Update, i: ImageFileType, data) =>
70
        errorHandler(fileHandler.createImageCopy(data, id, i), id, i, "Create/Update")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
71
      case (id, Delete, i: ImageFileType, _) =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
        errorHandler(fileHandler.deleteImageCopy(id), id, i, "Delete")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
73
74
      case (id, event, ft, _) =>
        ProcessFailure(id, ft, event.toString, new Exception)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
75
76
    }
}