RecordProcessor.scala 5.57 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


23
import java.io.ByteArrayOutputStream
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24
import java.util.Properties
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
25

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
26
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
27
28
import org.apache.kafka.clients.consumer.ConsumerRecord

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29
import scala.util.{Failure, Success, Try}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30

31
sealed trait ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
32

33
case class ProcessSuccess(id: String, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34

35
case class ProcessFatal(id: String, msg: String, ex: Throwable) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
36

37
38
39
case class ProcessWarning(id: String, msg: String) extends ProcessOutcome

case class ProcessIgnore(id: String, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40
41


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
42
43
44
45
46
class RecordProcessor(fileHandler: DisseminationCopyHandler,
                      fedoraClientWrapper: FedoraClientWrapper,
                      appSettings: Properties) {

  import FileUtils._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
47

48
  def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
49
    BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl")) flatMap {
50
51
52
      case Success(binaryResource) =>
        handleBinaryResource(binaryResource, record.key())
      case Failure(ex) => List(ex match {
53
54
55
56
        case e: NoLocalBinary => ProcessIgnore(record.key(), e.getMessage)
        case e: NoDigitalObject => ProcessIgnore(record.key(), e.getMessage)
        case e: UnmanageableMediaFileType => ProcessWarning(record.key(), e.getMessage)
        case e: Exception => ProcessFatal(record.key(), e.getMessage, e)
57
      })
58
59
    }
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
60

61
62
63
64
65
66
67
68
69
70
  private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
    fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
      case Success(_) if binaryResource.eventType == Delete =>
        deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.instantiationType)
      case Success(data) =>
        createResource(
          binaryResource.id,
          binaryResource.mimeType,
          binaryResource.instantiationType,
          data)
71
      case Failure(ex) => List(ProcessFatal(recordKey, s"Failed to retrieve binary from Fedora on ${binaryResource.filePath}", ex))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
    }
73
74
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
75
76
77
  private def createOutcome(res: Try[Boolean], id: String, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, s"Updating of file $destFile successful")
    case Success(false) => ProcessSuccess(id, s"Creation of file $destFile successful")
78
    case Failure(ex) => ProcessFatal(id, s"Creation of file $destFile failed", ex)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
79
80
81
82
83
  })

  private def deleteOutcome(res: Try[Boolean], id: String, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, s"Deletion of file $destFile successful")
    case Success(false) => ProcessSuccess(id, s"No deletion of file $destFile because object does not exist")
84
    case Failure(ex) => ProcessFatal(id, s"Deletion of file $destFile failed", ex)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
85
86
87

  })

88
89
  private def deleteResource(id: String,
                             mimeType: MimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
90
91
                             instantiationType: Instantiation): List[ProcessOutcome] = mimeType match {
    case _: AudioFile =>
92
      List(audioFilePath(id), audioSnippetPath(id))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
93
94
95
        .map(path => (fileHandler.deleteAudioCopy(path), path))
        .flatMap(x => deleteOutcome(x._1, id, x._2))
    case mT: VideoFile =>
96
      val destFile = videoFilePath(id, mT)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
97
98
99
      val res = fileHandler.deleteVideoCopy(destFile)
      deleteOutcome(res, id, destFile)
    case _: ImageFile if instantiationType == DigitalObject =>
100
      val destFile = imageFilePath(id)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
101
102
103
      val res = fileHandler.deleteImageCopy(destFile)
      deleteOutcome(res, id, destFile)
    case _: ImageFile if instantiationType == Thumbnail =>
104
      val destFile = videoPosterPath(id)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
105
106
      val res = fileHandler.deleteImageCopy(destFile)
      deleteOutcome(res, id, destFile)
107
108
109
110
111
  }

  private def createResource(id: String,
                             mimeType: MimeType,
                             instantiationType: Instantiation,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
                             data: ByteArrayOutputStream): List[ProcessOutcome] = mimeType match {
113
      case mT: AudioFile =>
114
        List((audioFilePath(id), false), (audioSnippetPath(id), true))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
115
116
          .map(path => (fileHandler.createAudioCopy(data, path._1, mT, path._2), path._1))
          .flatMap(x => createOutcome(x._1, id, x._2))
117
      case mT: VideoFile =>
118
        val destFile = videoFilePath(id, mT)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
119
120
        val res = fileHandler.createVideoCopy(data, destFile, mT)
        createOutcome(res, id, destFile)
121
      case mT: ImageFile if instantiationType == DigitalObject =>
122
        val destFile = imageFilePath(id)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
123
124
        val res = fileHandler.createImageCopy(data, destFile, mT)
        createOutcome(res, id, destFile)
125
      case mT: ImageFile if instantiationType == Thumbnail =>
126
        val destFile = videoPosterPath(id)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
127
128
        val res = fileHandler.createImageCopy(data, destFile, mT)
      createOutcome(res, id, destFile)
129
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
130
}