/* * Media Converter * Extracts media files from Fedora repository * Copyright (C) 2020 Memoriav * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package ch.memobase import java.io.ByteArrayOutputStream import ch.memobase.models._ import org.apache.kafka.clients.consumer.ConsumerRecord import scala.util.{Failure, Success, Try} trait ProcessOutcome case class ProcessSuccess(id: String, fileType: MediaFileType, action: String) extends ProcessOutcome case class ProcessFailure(id: String, fileType: MediaFileType, action: String, ex: Throwable) extends ProcessOutcome case class ProcessIgnore(id: String, reason: String, warn: Boolean = false) extends ProcessOutcome class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper: FedoraClientWrapper, externalBaseUrl: String) { private def errorHandler(fun: Try[_], objId: String, fileType: MediaFileType, action: String): ProcessOutcome = fun match { case Success(_) => ProcessSuccess(objId, fileType, action) case Failure(ex) => ProcessFailure(objId, fileType, action, ex) } def process(record: ConsumerRecord[String, String]): ProcessOutcome = { val test = (for { kafkaMsg <- BinaryResourceMetadata.build(record.value(), externalBaseUrl) fileWithMetadata <- fedoraClientWrapper.fetchBinaryResource(kafkaMsg.filePath) } yield createProcessResult(kafkaMsg.id, kafkaMsg.eventType, fileWithMetadata.fileType, fileWithMetadata.data)) test .recover { case e: NoLocalBinary => ProcessIgnore(record.key(), e.getMessage) case e: NoDigitalObject => ProcessIgnore(record.key(), e.getMessage) case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e) } }.get private def createProcessResult(id: String, fedoraEvent: Event, mediaFileType: MediaFileType, data: ByteArrayOutputStream) = (id, fedoraEvent, mediaFileType, data) match { case (id, Create | Update, af: AudioFileType, data) => errorHandler(fileHandler.createAudioCopy(data, id, af), id, af, "Create/Update") case (id, Delete, af: AudioFileType, _) => errorHandler(fileHandler.deleteAudioCopy(id), id, af, "Delete") case (id, Create | Update, vf: VideoFileType, data) => errorHandler(fileHandler.createVideoCopy(data, id, vf), id, vf, "Create/Update") case (id, Delete, vf: VideoFileType, _) => errorHandler(fileHandler.deleteVideoCopy(id, vf), id, vf, "Delete") case (id, Create | Update, i: ImageFileType, data) => errorHandler(fileHandler.createImageCopy(data, id, i), id, i, "Create/Update") case (id, Delete, i: ImageFileType, _) => errorHandler(fileHandler.deleteImageCopy(id), id, i, "Delete") case (id, event, ft, _) => ProcessFailure(id, ft, event.toString, new Exception) } }