RecordProcessor.scala 4.31 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


23
import java.io.{ByteArrayOutputStream, IOException}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
25
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
26
27
28
29
30
31
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.util.{Failure, Success, Try}

trait ProcessOutcome

32
case class ProcessSuccess(id: String, fileType: MimeType, action: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
33

34
case class ProcessFailure(id: String, fileType: MimeType, action: String, ex: Throwable) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
35
36
37
38

case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
39
class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40

41
  private def errorHandler(fun: Try[_], objId: String, fileType: MimeType, action: String): ProcessOutcome = fun match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
42
43
44
45
    case Success(_) => ProcessSuccess(objId, fileType, action)
    case Failure(ex) => ProcessFailure(objId, fileType, action, ex)
  }

46
47
48
49
50
51
52
53
  def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
    BinaryResourceMetadata.build(record.value(), externalBaseUrl) map {
      case Success(rm) => fedoraClientWrapper.fetchBinaryResource(rm.filePath) match {
        case Success(data) =>
          createProcessResult(rm.id, rm.eventType, rm.mimeType, rm.instantiationType, data)
        case Failure(ex) => ProcessFailure(record.key(), rm.mimeType, "", ex)
      }
      case Failure(ex) => ex match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
54
55
        case e: NoLocalBinary => ProcessIgnore(record.key(), e.getMessage)
        case e: NoDigitalObject => ProcessIgnore(record.key(), e.getMessage)
56
        case e: UnmanageableMediaFileType => ProcessIgnore(record.key(), e.getMessage)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
57
58
        case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
      }
59
60
    }
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
61

62
63
64
65
66
67
68
69
  //noinspection ScalaStyle
  private def createProcessResult(id: String,
                                  fedoraEvent: Event,
                                  mimeType: MimeType,
                                  instantiationType: Instantiation,
                                  data: ByteArrayOutputStream) =
    (id, fedoraEvent, mimeType, instantiationType, data) match {
      case (id, Create | Update, af: AudioFile, _, data) =>
70
        errorHandler(fileHandler.createAudioCopy(data, id, af), id, af, "Create/Update")
71
      case (id, Delete, af: AudioFile, _, _) =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
        errorHandler(fileHandler.deleteAudioCopy(id), id, af, "Delete")
73
      case (id, Create | Update, vf: VideoFile, _, data) =>
74
        errorHandler(fileHandler.createVideoCopy(data, id, vf), id, vf, "Create/Update")
75
      case (id, Delete, vf: VideoFile, _, _) =>
76
        errorHandler(fileHandler.deleteVideoCopy(id, vf), id, vf, "Delete")
77
      case (id, Create | Update, i: ImageFile, DigitalObject, data) =>
78
        errorHandler(fileHandler.createImageCopy(data, id, i), id, i, "Create/Update")
79
80
81
      case (id, Create | Update, i: ImageFile, Thumbnail, data) =>
        errorHandler(fileHandler.createImageCopy(data, id + "-poster", i), id, i, "Create/Update thumbnail")
      case (id, Delete, i: ImageFile, DigitalObject, _) =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
82
        errorHandler(fileHandler.deleteImageCopy(id), id, i, "Delete")
83
84
85
86
87
88
89
90
91
      case (id, Delete, i: ImageFile, Thumbnail, _) =>
        fileHandler.deleteImageCopy(id + "-poster") match {
          case Success(_) => ProcessSuccess(id, i, "Delete thumbnail")
          case Failure(ex) => ex match {
            case _: IOException => ProcessIgnore(id, "Thumbnail does not exist")
            case e => ProcessFailure(id, i, "Delete thumbnail", e)
          }
        }
      case (id, event, ft, _, _) =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
92
        ProcessFailure(id, ft, event.toString, new Exception)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
93
94
    }
}