RecordProcessor.scala 4.81 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


23
import java.io.ByteArrayOutputStream
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
25
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
26
27
import org.apache.kafka.clients.consumer.ConsumerRecord

28
import scala.util.{Failure, Success}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29
30
31

trait ProcessOutcome

32
case class ProcessSuccess(id: String, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
33

34
case class ProcessFailure(id: String, msg: String, ex: Throwable) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
35

36
case class ProcessWarn(id: String, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
37
38


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
39
class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40

41
  def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
42
43
44
45
46
47
48
49
50
    BinaryResourceMetadata.build(record.value(), externalBaseUrl) flatMap {
      case Success(binaryResource) =>
        handleBinaryResource(binaryResource, record.key())
      case Failure(ex) => List(ex match {
        case e: NoLocalBinary => ProcessWarn(record.key(), e.getMessage)
        case e: NoDigitalObject => ProcessWarn(record.key(), e.getMessage)
        case e: UnmanageableMediaFileType => ProcessWarn(record.key(), e.getMessage)
        case e: Exception => ProcessFailure(record.key(), e.getMessage, e)
      })
51
52
    }
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
53

54
55
56
57
58
59
60
61
62
63
64
  private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
    fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
      case Success(_) if binaryResource.eventType == Delete =>
        deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.instantiationType)
      case Success(data) =>
        createResource(
          binaryResource.id,
          binaryResource.mimeType,
          binaryResource.instantiationType,
          data)
      case Failure(ex) => List(ProcessFailure(recordKey, "Failed to retrieve binary from Fedora", ex))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
65
    }
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
  }

  private def deleteResource(id: String,
                             mimeType: MimeType,
                             instantiationType: Instantiation): List[ProcessOutcome] = {
    (mimeType match {
      case _: AudioFile =>
        (fileHandler.deleteAudioCopy(id), id) ::
        List((fileHandler.deleteImageCopy(id + "-intro"), id))
      case mT: VideoFile =>
        List((fileHandler.deleteVideoCopy(id, mT), id))
      case _: ImageFile if instantiationType == DigitalObject =>
        List((fileHandler.deleteImageCopy(id), id))
      case _: ImageFile if instantiationType == Thumbnail =>
        val posterId = id + "-poster"
        List((fileHandler.deleteImageCopy(posterId), posterId))
    }) flatMap(res => List(res._1 match {
      case Success(true) => ProcessSuccess(res._2, s"Deletion of object $id with type $mimeType successful")
      case Success(false) => ProcessSuccess(res._2, s"No deletion of object $id with type $mimeType because object does not exist")
      case Failure(ex) => ProcessFailure(res._2, "Deletion of object $id with type $mimeType failed", ex)
    }))
  }

  private def createResource(id: String,
                             mimeType: MimeType,
                             instantiationType: Instantiation,
                             data: ByteArrayOutputStream): List[ProcessOutcome] = {
    (mimeType match {
      case mT: AudioFile =>
        (fileHandler.createAudioCopy(data, id, mT), id) ::
        List((fileHandler.createAudioCopy(data, id + "-intro", mT), id))
      case mT: VideoFile =>
        List((fileHandler.createVideoCopy(data, id, mT), id))
      case mT: ImageFile if instantiationType == DigitalObject =>
        List((fileHandler.createImageCopy(data, id, mT), id))
      case mT: ImageFile if instantiationType == Thumbnail =>
        val posterId = id + "-poster"
        List((fileHandler.createImageCopy(data, posterId, mT), posterId))

    }) flatMap(res => List(res._1 match {
      case Success(true) => ProcessSuccess(res._2, s"Updating of object $id with type $mimeType successful")
      case Success(false) => ProcessSuccess(res._2, s"Creation of object $id with type $mimeType successful")
      case Failure(ex) => ProcessFailure(res._2, "Creation of object $id with type $mimeType failed", ex)
    }))

  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
}