RecordProcessor.scala 6.02 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 * Extracts media files from Fedora repository
 * Copyright (C) 2020  Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
23
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24
25
import org.apache.kafka.clients.consumer.ConsumerRecord

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
26
27
import java.io.ByteArrayOutputStream
import java.util.Properties
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
28
import scala.util.{Failure, Success, Try}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29

30
sealed trait ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
31

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
32
case class ProcessSuccess(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
33

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34
case class ProcessFatal(id: String, resource: MemobaseResource, msg: String, ex: Throwable) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
35

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
36
case class ProcessWarning(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
37

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
38
case class ProcessIgnore(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
39
40


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
41
42
class RecordProcessor(fileHandler: DisseminationCopyHandler,
                      fedoraClientWrapper: FedoraClientWrapper,
43
                      appSettings: Properties) extends FileUtils {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
44

45
  val rootPath: String = appSettings.getProperty("mediaFolderRootPath")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
46

47
  def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
48
    BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl")) flatMap {
49
50
51
      case Success(binaryResource) =>
        handleBinaryResource(binaryResource, record.key())
      case Failure(ex) => List(ex match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
52
53
54
55
        case e: NoLocalBinary => ProcessIgnore(record.key(), e.resource, e.getMessage)
        case e: NoDigitalObject => ProcessIgnore(record.key(), DigitalObject, e.getMessage)
        case e: UnmanageableMediaFileType => ProcessWarning(record.key(), e.resource, e.getMessage)
        case e: Exception => ProcessFatal(record.key(), Record, e.getMessage, e)
56
      })
57
58
    }
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
59

60
61
62
  private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
    fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
      case Success(_) if binaryResource.eventType == Delete =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63
        deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.resource)
64
65
66
67
      case Success(data) =>
        createResource(
          binaryResource.id,
          binaryResource.mimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
68
          binaryResource.resource,
69
          data)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
70
71
      case Failure(ex) => List(ProcessFatal(recordKey, binaryResource.resource,
        s"Failed to retrieve binary from Fedora on ${binaryResource.filePath}", ex))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
    }
73
74
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
75
76
77
78
  private def createOutcome(res: Try[Boolean], id: String, resource: MemobaseResource, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, resource, s"Updating of file $destFile successful")
    case Success(false) => ProcessSuccess(id, resource, s"Creation of file $destFile successful")
    case Failure(ex) => ProcessFatal(id, resource, s"Creation of file $destFile failed", ex)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
79
80
  })

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
81
82
83
84
  private def deleteOutcome(res: Try[Boolean], id: String, resource: MemobaseResource, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, resource, s"Deletion of file $destFile successful")
    case Success(false) => ProcessSuccess(id, resource, s"No deletion of file $destFile because object does not exist")
    case Failure(ex) => ProcessFatal(id, resource, s"Deletion of file $destFile failed", ex)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
85
86
87

  })

88
89
  private def deleteResource(id: String,
                             mimeType: MimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
90
                             resource: MemobaseResource): List[ProcessOutcome] = mimeType match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
91
    case _: AudioFile =>
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
92
93
94
      List((audioFilePath(id), DigitalObject), (audioSnippetPath(id), AudioSnippet))
        .map(obj => (fileHandler.deleteAudioCopy(obj._1), obj._2, obj._1))
        .flatMap(x => deleteOutcome(x._1, id, x._2, x._3))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
95
    case mT: VideoFile =>
96
      val destFile = videoFilePath(id, mT)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
97
      val res = fileHandler.deleteVideoCopy(destFile)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
98
99
      deleteOutcome(res, id, DigitalObject, destFile)
    case _: ImageFile if resource == DigitalObject =>
100
      val destFile = imageFilePath(id)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
101
      val res = fileHandler.deleteImageCopy(destFile)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
102
103
      deleteOutcome(res, id, DigitalObject, destFile)
    case _: ImageFile if resource == Thumbnail =>
104
      val destFile = videoPosterPath(id)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
105
      val res = fileHandler.deleteImageCopy(destFile)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
106
      deleteOutcome(res, id, Thumbnail, destFile)
107
108
109
110
  }

  private def createResource(id: String,
                             mimeType: MimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
111
                             resource: MemobaseResource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
                             data: ByteArrayOutputStream): List[ProcessOutcome] = mimeType match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    case mT: AudioFile =>
      List((audioFilePath(id), false, DigitalObject), (audioSnippetPath(id), true, AudioSnippet))
        .map(path => (fileHandler.createAudioCopy(data, path._1, mT, path._2), path._1, path._3))
        .flatMap(x => createOutcome(x._1, id, x._3, x._2))
    case mT: VideoFile =>
      val destFile = videoFilePath(id, mT)
      val res = fileHandler.createVideoCopy(data, destFile, mT)
      createOutcome(res, id, DigitalObject, destFile)
    case mT: ImageFile if resource == DigitalObject =>
      val destFile = imageFilePath(id)
      val res = fileHandler.createImageCopy(data, destFile, mT)
      createOutcome(res, id, DigitalObject, destFile)
    case mT: ImageFile if resource == Thumbnail =>
      val destFile = videoPosterPath(id)
      val res = fileHandler.createImageCopy(data, destFile, mT)
      createOutcome(res, id, Thumbnail, destFile)
129
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
130
}