RecordProcessor.scala 5.5 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


23
import java.io.ByteArrayOutputStream
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24
import java.util.Properties
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
25

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
26
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
27
28
import org.apache.kafka.clients.consumer.ConsumerRecord

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29
import scala.util.{Failure, Success, Try}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30
31
32

trait ProcessOutcome

33
case class ProcessSuccess(id: String, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34

35
case class ProcessFailure(id: String, msg: String, ex: Throwable) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
36

37
case class ProcessWarn(id: String, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
38
39


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40
41
42
43
44
class RecordProcessor(fileHandler: DisseminationCopyHandler,
                      fedoraClientWrapper: FedoraClientWrapper,
                      appSettings: Properties) {

  import FileUtils._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
45

46
  def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
47
    BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl")) flatMap {
48
49
50
51
52
53
54
55
      case Success(binaryResource) =>
        handleBinaryResource(binaryResource, record.key())
      case Failure(ex) => List(ex match {
        case e: NoLocalBinary => ProcessWarn(record.key(), e.getMessage)
        case e: NoDigitalObject => ProcessWarn(record.key(), e.getMessage)
        case e: UnmanageableMediaFileType => ProcessWarn(record.key(), e.getMessage)
        case e: Exception => ProcessFailure(record.key(), e.getMessage, e)
      })
56
57
    }
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
58

59
60
61
62
63
64
65
66
67
68
69
  private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
    fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
      case Success(_) if binaryResource.eventType == Delete =>
        deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.instantiationType)
      case Success(data) =>
        createResource(
          binaryResource.id,
          binaryResource.mimeType,
          binaryResource.instantiationType,
          data)
      case Failure(ex) => List(ProcessFailure(recordKey, "Failed to retrieve binary from Fedora", ex))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
70
    }
71
72
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
73
74
75
76
77
78
79
80
81
82
83
84
85
  private def createOutcome(res: Try[Boolean], id: String, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, s"Updating of file $destFile successful")
    case Success(false) => ProcessSuccess(id, s"Creation of file $destFile successful")
    case Failure(ex) => ProcessFailure(id, s"Creation of file $destFile failed", ex)
  })

  private def deleteOutcome(res: Try[Boolean], id: String, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, s"Deletion of file $destFile successful")
    case Success(false) => ProcessSuccess(id, s"No deletion of file $destFile because object does not exist")
    case Failure(ex) => ProcessFailure(id, s"Deletion of file $destFile failed", ex)

  })

86
87
  private def deleteResource(id: String,
                             mimeType: MimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
                             instantiationType: Instantiation): List[ProcessOutcome] = mimeType match {
    case _: AudioFile =>
      List(createAudioFile(id), createAudioSnippetFile(id))
        .map(path => (fileHandler.deleteAudioCopy(path), path))
        .flatMap(x => deleteOutcome(x._1, id, x._2))
    case mT: VideoFile =>
      val destFile = createVideoFile(id, mT)
      val res = fileHandler.deleteVideoCopy(destFile)
      deleteOutcome(res, id, destFile)
    case _: ImageFile if instantiationType == DigitalObject =>
      val destFile = createImageFile(id)
      val res = fileHandler.deleteImageCopy(destFile)
      deleteOutcome(res, id, destFile)
    case _: ImageFile if instantiationType == Thumbnail =>
      val destFile = createVideoPosterFile(id)
      val res = fileHandler.deleteImageCopy(destFile)
      deleteOutcome(res, id, destFile)
105
106
107
108
109
  }

  private def createResource(id: String,
                             mimeType: MimeType,
                             instantiationType: Instantiation,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
110
                             data: ByteArrayOutputStream): List[ProcessOutcome] = mimeType match {
111
      case mT: AudioFile =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
113
114
        List((createAudioFile(id), false), (createAudioSnippetFile(id), true))
          .map(path => (fileHandler.createAudioCopy(data, path._1, mT, path._2), path._1))
          .flatMap(x => createOutcome(x._1, id, x._2))
115
      case mT: VideoFile =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
116
117
118
        val destFile = createVideoFile(id, mT)
        val res = fileHandler.createVideoCopy(data, destFile, mT)
        createOutcome(res, id, destFile)
119
      case mT: ImageFile if instantiationType == DigitalObject =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
120
121
122
        val destFile = createImageFile(id)
        val res = fileHandler.createImageCopy(data, destFile, mT)
        createOutcome(res, id, destFile)
123
      case mT: ImageFile if instantiationType == Thumbnail =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
124
125
126
        val destFile = createVideoPosterFile(id)
        val res = fileHandler.createImageCopy(data, destFile, mT)
      createOutcome(res, id, destFile)
127
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
128
}