Unverified Commit 0ead5b28 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

adapt file fetching mechanism to new workflow

parent d08d3d72
Pipeline #20180 passed with stages
in 7 minutes and 34 seconds
......@@ -37,20 +37,8 @@ spec:
value: media-converter
- name: AUDIO_SNIPPET_DURATION
value: "30"
- name: INTERNAL_BASE_URL
value: "http://mb-fed1.memobase.unibas.ch:8080/fcrepo/rest/"
- name: EXTERNAL_BASE_URL
value: "https://memobase.ch/"
- name: FEDORA_USER
valueFrom:
secretKeyRef:
name: fedora-admin-credentials
key: FEDORA_USER
- name: FEDORA_PASSWORD
valueFrom:
secretKeyRef:
name: fedora-admin-credentials
key: FEDORA_PASSWORD
- name: KAFKA_BOOTSTRAP_SERVERS
value: mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
- name: CLIENT_ID
......
app:
audioSnippetDuration: ${AUDIO_SNIPPET_DURATION:?system}
internalBaseUrl: ${INTERNAL_BASE_URL:?system}
externalBaseUrl: ${EXTERNAL_BASE_URL:?system}
fedoraUser: ${FEDORA_USER:?system}
fedoraPassword: ${FEDORA_PASSWORD:?system}
mediaFolderRootPath: ${MEDIA_FOLDER_ROOT_PATH:?system}
distributorUrl: ${DISTRIBUTOR_URL:?system}
kafka:
......
......@@ -31,10 +31,7 @@ import scala.collection.JavaConverters._
object App extends scala.App with Logging with RecordUtils {
val settings = new SettingsLoader(List(
"audioSnippetDuration",
"internalBaseUrl",
"externalBaseUrl",
"fedoraUser",
"fedoraPassword",
"mediaFolderRootPath",
"distributorUrl"
).asJava,
......@@ -46,13 +43,7 @@ object App extends scala.App with Logging with RecordUtils {
val consumer = new KafkaConsumer[String, String](settings.getKafkaConsumerSettings)
val fileHandler = new DisseminationCopyHandler(
settings.getAppSettings.getProperty("audioSnippetDuration").toInt)
val fCWrapper = FedoraClientWrapper(
settings.getAppSettings.getProperty("internalBaseUrl"),
settings.getAppSettings.getProperty("externalBaseUrl"),
settings.getAppSettings.getProperty("fedoraUser"),
settings.getAppSettings.getProperty("fedoraPassword")
)
val recordProcessor = new RecordProcessor(fileHandler, fCWrapper, settings.getAppSettings)
val recordProcessor = new RecordProcessor(fileHandler, settings.getAppSettings)
val reporter = Reporter(settings.getKafkaProducerSettings, settings.getProcessReportTopic)
val consumerPollTimeoutMs = 100
......
/*
* Media Converter
* Extracts media files from Fedora repository
* Copyright (C) 2021 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package ch.memobase
import org.memobase.fedora.{FedoraClient, FedoraClientImpl}
import java.io.ByteArrayOutputStream
import java.net.URL
import scala.io.BufferedSource
import scala.language.postfixOps
import scala.util.Try
/**
* Acts as a thin wrapper around a [[org.memobase.fedora.FedoraClient]] instance
*
* @param fc Wrapped [[org.memobase.fedora.FedoraClient]] instance
*/
class FedoraClientWrapper(fc: FedoraClient) {
import FedoraClientWrapper.copyDataAndCloseResource
/**
* Fetches a binary resource from Fedora and returns, if successful, a [[ByteArrayOutputStream]] instance
*
* @param url The URL to the resource
* @return
*/
def fetchBinaryResource(url: String): Try[ByteArrayOutputStream] = {
for {
u <- Try(new URL(url))
ip <- Try(io.Source.fromURL(u))
outputStream <- copyDataAndCloseResource(ip)
} yield outputStream
}
}
object FedoraClientWrapper {
/**
* Creates a [[FedoraClientWrapper]] instance
*
* @param interalBaseUrl Base URL used internally by Fedora
* @param externalBaseUrl Base URL used outside of Fedora
* @param fedoraUsername Username used to query Fedora
* @param fedoraPassword Password used to query Fedora
* @return
*/
def apply(interalBaseUrl: String,
externalBaseUrl: String,
fedoraUsername: String,
fedoraPassword: String): FedoraClientWrapper = {
val builder = FedoraClientImpl.builder()
builder.urls(interalBaseUrl, externalBaseUrl)
builder.credentials(fedoraUsername, fedoraPassword)
val fc = builder.build()
new FedoraClientWrapper(fc)
}
private def copyDataAndCloseResource(binaryResource: BufferedSource): Try[ByteArrayOutputStream] = {
val outputStream: Try[ByteArrayOutputStream] = Try {
val oS = new ByteArrayOutputStream()
Iterator.continually(binaryResource.bufferedReader().read())
.takeWhile(-1 !=)
.foreach(oS.write)
oS
}
binaryResource.close()
outputStream
}
}
......@@ -24,6 +24,7 @@ import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
import java.io.ByteArrayOutputStream
import java.net.URL
import java.util.Properties
import scala.util.{Failure, Success, Try}
......@@ -39,7 +40,6 @@ case class ProcessIgnore(id: String, resource: MemobaseResource, msg: String) ex
class RecordProcessor(fileHandler: DisseminationCopyHandler,
fedoraClientWrapper: FedoraClientWrapper,
appSettings: Properties) extends FileUtils {
val rootPath: String = appSettings.getProperty("mediaFolderRootPath")
......@@ -59,15 +59,15 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
}
private def handleBinaryResource(binaryResource: BinaryResourceMetadata, recordKey: String): List[ProcessOutcome] = {
fedoraClientWrapper.fetchBinaryResource(binaryResource.filePath) match {
fetchBinaryResource(binaryResource.filePath) match {
case Success(_) if binaryResource.eventType == Delete =>
deleteResource(binaryResource.id, binaryResource.mimeType, binaryResource.resource)
case Success(data) =>
case Success(tempFilePath) =>
createResource(
binaryResource.id,
binaryResource.mimeType,
binaryResource.resource,
data)
tempFilePath)
case Failure(ex) => List(ProcessFatal(recordKey, binaryResource.resource,
s"Failed to retrieve binary from Media File Distributor with URL ${binaryResource.filePath}", ex))
}
......@@ -128,4 +128,21 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
val res = fileHandler.createImageCopy(data, destFile, mT)
createOutcome(res, id, Thumbnail, destFile)
}
/**
* Fetches a binary resource from remote API and returns, if successful, a [[scala.io.BufferedSource]] instance
*
* @param url The URL to the resource
* @return
*/
private def fetchBinaryResource(url: String): Try[ByteArrayOutputStream] = Try {
val baos = new ByteArrayOutputStream()
val u = new URL(url)
val inputStream = u.openStream()
Iterator.continually(inputStream.read)
.takeWhile(-1 !=)
.foreach(baos.write)
baos
}
}
......@@ -19,14 +19,11 @@
package ch.memobase
import java.io.ByteArrayOutputStream
import java.util.Properties
import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.scalamock.scalatest.MockFactory
import org.scalatest.funsuite.AnyFunSuite
import java.io.ByteArrayOutputStream
import scala.io.Source
class RecordProcessorTest extends AnyFunSuite with MockFactory {
......@@ -72,13 +69,12 @@ class RecordProcessorTest extends AnyFunSuite with MockFactory {
baos
}
private val fixture = new {
/* private val fixture = new {
val externalBaseUrl = "https://memobase.ch"
val resourceId = "BAZ-MEI_77466-1"
val pathToResource = s"$externalBaseUrl/digital/$resourceId/binary"
val mockDisseminationCopyHandler: DisseminationCopyHandler = mock[DisseminationCopyHandler]
val mockFedoraClientWrapper: FedoraClientWrapper = mock[FedoraClientWrapper]
}
//val pathToResource = s"$externalBaseUrl/digital/$resourceId/binary"
//val mockDisseminationCopyHandler: DisseminationCopyHandler = mock[DisseminationCopyHandler]
} */
/* test("an object of mimeType image/jpeg and eventType Create should trigger copyImage") {
val f = fixture
......@@ -104,16 +100,14 @@ class RecordProcessorTest extends AnyFunSuite with MockFactory {
rP.process(cR)
} */
test("a message which does not refer to a binary file should be ignored") {
val f = fixture
val mockDCH = f.mockDisseminationCopyHandler
val mockFCW = f.mockFedoraClientWrapper
val (incomingMessage, _) = createIncomingMessage(Delete, Mp3File, hasBinaryResource = false)
(mockFCW.fetchBinaryResource _).expects(*).never()
val props = new Properties()
props.setProperty("externalBaseUrl", f.externalBaseUrl)
val rP = new RecordProcessor(mockDCH, mockFCW, props)
val cR = new ConsumerRecord[String, String]("_void", 1, 0, "1", incomingMessage)
rP.process(cR)
}
/* test("a message which does not refer to a binary file should be ignored") {
val f = fixture
val mockDCH = f.mockDisseminationCopyHandler
val (incomingMessage, _) = createIncomingMessage(Delete, Mp3File, hasBinaryResource = false)
val props = new Properties()
props.setProperty("externalBaseUrl", f.externalBaseUrl)
val rP = new RecordProcessor(mockDCH, mockFCW, props)
val cR = new ConsumerRecord[String, String]("_void", 1, 0, "1", incomingMessage)
rP.process(cR)
}*/
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment