RecordProcessor.scala 6.46 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
/*
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
2
 * Media Converter
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
3
 * Extracts media files from Fedora repository
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
4
 * Copyright (C) 2021  Memoriav
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

20
package ch.memobase
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
21
22


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
23
import ch.memobase.models._
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
24
25
import org.apache.kafka.clients.consumer.ConsumerRecord

26
27
import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{ConnectException, URL}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
28
import java.util.Properties
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
29
import scala.util.{Failure, Success, Try}
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30

31
sealed trait ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
32

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
33
case class ProcessSuccess(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
35
case class ProcessFatal(id: String, resource: MemobaseResource, msg: String, ex: Throwable) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
36

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
37
case class ProcessWarning(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
38

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
39
case class ProcessIgnore(id: String, resource: MemobaseResource, msg: String) extends ProcessOutcome
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
40
41


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
42
class RecordProcessor(fileHandler: DisseminationCopyHandler,
43
                      appSettings: Properties) extends FileUtils {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
44

45
  val rootPath: String = appSettings.getProperty("mediaFolderRootPath")
46
  val cachedImagePath: String = appSettings.getProperty("thumbnailFolderPath")
47
  val distributorUrl: String = appSettings.getProperty("distributorUrl")
48
49
  val maxRetries: Int = appSettings.getProperty("connectionMaxRetries").toInt
  val retryAfter: Int = appSettings.getProperty("connectionRetryAfterMs").toInt
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
50

51
  def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
52
    BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl"), distributorUrl) flatMap {
53
54
55
      case Success(binaryResource) =>
        handleBinaryResource(binaryResource, record.key())
      case Failure(ex) => List(ex match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
56
57
58
59
        case e: NoLocalBinary => ProcessIgnore(record.key(), e.resource, e.getMessage)
        case e: NoDigitalObject => ProcessIgnore(record.key(), DigitalObject, e.getMessage)
        case e: UnmanageableMediaFileType => ProcessWarning(record.key(), e.resource, e.getMessage)
        case e: Exception => ProcessFatal(record.key(), Record, e.getMessage, e)
60
      })
61
62
    }
  }
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63

64
  private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
65
66
    fetchBinaryResource(binaryResource.filePath) match {
      case Success(tempFilePath) =>
67
68
69
        createResource(
          binaryResource.id,
          binaryResource.mimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
70
          binaryResource.resource,
71
          tempFilePath)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
72
      case Failure(ex) => List(ProcessFatal(recordKey, binaryResource.resource,
73
        s"Failed to retrieve binary from Media File Distributor with URL ${binaryResource.filePath}", ex))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
74
    }
75
76
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
77
78
79
80
  private def createOutcome(res: Try[Boolean], id: String, resource: MemobaseResource, destFile: String): List[ProcessOutcome] = List(res match {
    case Success(true) => ProcessSuccess(id, resource, s"Updating of file $destFile successful")
    case Success(false) => ProcessSuccess(id, resource, s"Creation of file $destFile successful")
    case Failure(ex) => ProcessFatal(id, resource, s"Creation of file $destFile failed", ex)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
81
82
  })

83
84
  private def createResource(id: String,
                             mimeType: MimeType,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
85
                             resource: MemobaseResource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
86
                             data: ByteArrayOutputStream): List[ProcessOutcome] = mimeType match {
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
87
88
89
90
91
92
93
94
95
    case mT: AudioFile =>
      List((audioFilePath(id), false, DigitalObject), (audioSnippetPath(id), true, AudioSnippet))
        .map(path => (fileHandler.createAudioCopy(data, path._1, mT, path._2), path._1, path._3))
        .flatMap(x => createOutcome(x._1, id, x._3, x._2))
    case mT: VideoFile =>
      val destFile = videoFilePath(id, mT)
      val res = fileHandler.createVideoCopy(data, destFile, mT)
      createOutcome(res, id, DigitalObject, destFile)
    case mT: ImageFile if resource == DigitalObject =>
96
      val destFile = imageFileRootPath(id, mT)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
97
98
      createImageAndThumbnail(id, data, destFile, mT, DigitalObject)
    case mT: ImageFile if resource == VideoThumbnail =>
99
      val destFile = videoPosterPath(id, mT)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
100
      createImageAndThumbnail(id, data, destFile, mT, VideoThumbnail)
101
102
  }

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
103
104
105
106
107
  private def createImageAndThumbnail(id: String,
                                      data: ByteArrayOutputStream,
                                      destImageFile: String,
                                      mimeType: MimeType,
                                      memobaseResource: MemobaseResource): List[ProcessOutcome] = {
108
    val resMediaFile = fileHandler.createImageCopy(data, destImageFile, mimeType)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
109
    val outcomeMediaFile = createOutcome(resMediaFile, id, memobaseResource, destImageFile)
110
111
112
    val destPreviewFile = cachedImageFilePath(id, mimeType)
    val (width, height) = getThumbnailDimensions
    val resThumbnail = fileHandler.createImageThumbnail(destImageFile, destPreviewFile, width, height)
113
    val outcomeThumbnail = createOutcome(resThumbnail, id, ImageThumbnail, destPreviewFile)
114
115
116
117
118
119
    outcomeMediaFile ++ outcomeThumbnail
  }

  private def getThumbnailDimensions: (Option[Int], Option[Int]) = {
    (Try(appSettings.getProperty("thumbnailWidth").toInt).toOption.filter(_ >= 1),
      Try(appSettings.getProperty("thumbnailHeight").toInt).toOption.filter(_ >= 1))
120
  }
121
122
123
124
125
126
127
128
129

  /**
   * Fetches a binary resource from remote API and returns, if successful, a [[scala.io.BufferedSource]] instance
   *
   * @param url The URL to the resource
   * @return
   */
  private def fetchBinaryResource(url: String): Try[ByteArrayOutputStream] = Try {
    val baos = new ByteArrayOutputStream()
130
    val inputStream = getStream(url)
131
132
133
134
135
    Iterator.continually(inputStream.read)
      .takeWhile(-1 !=)
      .foreach(baos.write)
    baos
  }
136
137
138
139
140
141
142
143
144

  private def getStream(url: String, retries: Int = 0): InputStream = Try {
    val u = new URL(url)
    u.openStream
  } recover {
    case _: ConnectException if retries <= maxRetries =>
      Thread.sleep(retryAfter)
      getStream(url, retries + 1)
  } get
145
}