use media distributor service as binary provider

parent 077963d8
Pipeline #20145 passed with stages
in 8 minutes and 31 seconds
......@@ -63,6 +63,8 @@ spec:
value: postprocessing-reporting
- name: MEDIA_FOLDER_ROOT_PATH
value: "/data"
- name: DISTRIBUTOR_URL
value: "mb-wf2:3000"
restartPolicy: Always
volumes:
......
......@@ -5,6 +5,7 @@ app:
fedoraUser: ${FEDORA_USER:?system}
fedoraPassword: ${FEDORA_PASSWORD:?system}
mediaFolderRootPath: ${MEDIA_FOLDER_ROOT_PATH:?system}
distributorUrl: ${DISTRIBUTOR_URL:?system}
kafka:
consumer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -26,7 +26,6 @@ import org.memobase.settings.SettingsLoader
import java.time.Duration
import scala.collection.JavaConverters._
import scala.util.Try
object App extends scala.App with Logging with RecordUtils {
......@@ -36,7 +35,8 @@ object App extends scala.App with Logging with RecordUtils {
"externalBaseUrl",
"fedoraUser",
"fedoraPassword",
"mediaFolderRootPath"
"mediaFolderRootPath",
"distributorUrl"
).asJava,
"app.yml",
true,
......
......@@ -19,10 +19,11 @@
package ch.memobase
import org.memobase.fedora.{BinaryResource, FedoraClient, FedoraClientImpl}
import org.memobase.fedora.{FedoraClient, FedoraClientImpl}
import java.io.ByteArrayOutputStream
import java.net.URI
import java.net.URL
import scala.io.BufferedSource
import scala.language.postfixOps
import scala.util.Try
......@@ -44,9 +45,10 @@ class FedoraClientWrapper(fc: FedoraClient) {
*/
def fetchBinaryResource(url: String): Try[ByteArrayOutputStream] = {
for {
u <- Try(new URI(url))
resource <- Try(fc.fetchBinaryResource(u))
outputStream <- copyDataAndCloseResource(resource)
u <- Try(new URL(url))
ip <- Try(io.Source.fromURL(u))
//resource <- Try(fc.fetchBinaryResource(u))
outputStream <- copyDataAndCloseResource(ip)
} yield outputStream
}
}
......@@ -72,10 +74,10 @@ object FedoraClientWrapper {
new FedoraClientWrapper(fc)
}
private def copyDataAndCloseResource(binaryResource: BinaryResource): Try[ByteArrayOutputStream] = {
private def copyDataAndCloseResource(binaryResource: BufferedSource): Try[ByteArrayOutputStream] = {
val outputStream: Try[ByteArrayOutputStream] = Try {
val oS = new ByteArrayOutputStream()
Iterator.continually(binaryResource.getData.read)
Iterator.continually(binaryResource.bufferedReader().read())
.takeWhile(-1 !=)
.foreach(oS.write)
oS
......
......@@ -43,9 +43,10 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
appSettings: Properties) extends FileUtils {
val rootPath: String = appSettings.getProperty("mediaFolderRootPath")
val distributorUrl: String = appSettings.getProperty("distributorUrl")
def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl")) flatMap {
BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl"), distributorUrl) flatMap {
case Success(binaryResource) =>
handleBinaryResource(binaryResource, record.key())
case Failure(ex) => List(ex match {
......
......@@ -49,15 +49,32 @@ object BinaryResourceMetadata extends RecordUtils {
*
* @param msg Pulled Kafka message
* @param externalBaseUrl Base URL of resource used outside of Fedora
* @param distributorHost Host and port of media distributor service
* @return
*/
def build(msg: String, externalBaseUrl: String): List[Try[BinaryResourceMetadata]] = {
def build(msg: String, externalBaseUrl: String, distributorHost: String): List[Try[BinaryResourceMetadata]] = {
val jsonldGraph = getJsonldGraph(msg)
extractBinaryResourceMetadata(jsonldGraph, externalBaseUrl)
extractBinaryResourceMetadata(jsonldGraph, externalBaseUrl, distributorHost)
}
private def buildDistributorUrl(longId: String, baseUrl: String, distributorHost: String, resourceType: MemobaseResource): String = {
val pattern = raw"""$baseUrl/digital/([^/]+).*""".r
pattern.findFirstMatchIn(longId) match {
case Some(m) =>
resourceType match {
case DigitalObject =>
s"${distributorHost.stripSuffix("/")}/media/${m.group(1)}"
case Thumbnail =>
s"${distributorHost.stripSuffix("/")}/thumbnail/${m.group(1)}"
case _ =>
""
}
case None => ""
}
}
//noinspection ScalaStyle
private def extractBinaryResourceMetadata(jsonldGraph: ArrayBuffer[Value], baseUrl: String): List[Try[BinaryResourceMetadata]] =
private def extractBinaryResourceMetadata(jsonldGraph: ArrayBuffer[Value], baseUrl: String, distributorHost: String): List[Try[BinaryResourceMetadata]] =
jsonldGraph.value
.withFilter {
v => isDigitalObject(v.obj) || isPreviewImage(v.obj)
......@@ -73,7 +90,7 @@ object BinaryResourceMetadata extends RecordUtils {
val instantiation = MemobaseResource(v("type").str)
BinaryResourceMetadata(
v("@id").str.substring(s"$baseUrl/digital/".length - 1),
v("locator").str,
buildDistributorUrl(v("@id").str, baseUrl, distributorHost, instantiation),
if (instantiation == Thumbnail) {
JpegFile
} else {
......
......@@ -39,13 +39,13 @@ class BinaryResourceMetadataTest extends AnyFunSuite {
}
test("the value of the id field of a KafkaMessage should match the id of the parsed object") {
val km = BinaryResourceMetadata.build(loadMessageWithBinaryResource("Create", "image/png", "https://memobase.ch/digital/BAZ-MEI_77466-1/binary"), externalBaseUrl)
val km = BinaryResourceMetadata.build(loadMessageWithBinaryResource("Create", "image/png", "https://memobase.ch/digital/BAZ-MEI_77466-1/binary"), externalBaseUrl, "mb-wf2")
assert(km.head.isSuccess)
}
test("a reference to a non-local binary should throw a NoLocalBinary exception") {
assertThrows[NoLocalBinary] {
BinaryResourceMetadata.build(loadMessageWithBinaryResource("Create", "image/jpeg", "https://example.com"), externalBaseUrl)
BinaryResourceMetadata.build(loadMessageWithBinaryResource("Create", "image/jpeg", "https://example.com"), externalBaseUrl, "mb-wf2")
.head
.get
}
......@@ -55,7 +55,7 @@ class BinaryResourceMetadataTest extends AnyFunSuite {
assertThrows[UnmanageableMediaFileType] {
BinaryResourceMetadata.build(loadMessageWithBinaryResource("Create",
"application/pdf",
"https://memobase.ch/digital/BAZ-MEI_77466-1/binary"), externalBaseUrl)
"https://memobase.ch/digital/BAZ-MEI_77466-1/binary"), externalBaseUrl, "mb-wf2")
.head
.get
}
......@@ -65,7 +65,7 @@ class BinaryResourceMetadataTest extends AnyFunSuite {
assertThrows[UnknownEventType] {
BinaryResourceMetadata.build(loadMessageWithBinaryResource("Upload",
"image/jpeg",
"https://memobase.ch/digital/BAZ-MEI_77466-1/binary"), externalBaseUrl)
"https://memobase.ch/digital/BAZ-MEI_77466-1/binary"), externalBaseUrl, "mb-wf2")
.head
.get
}
......
......@@ -19,14 +19,13 @@
package ch.memobase
import java.io.{ByteArrayOutputStream, File, FileInputStream}
import java.nio.file.{Files, Paths}
import ch.memobase.TestUtilities._
import ch.memobase.models.{JpegFile, MimeType, Mp3File, VideoMpeg4File}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.{Assertion, BeforeAndAfter}
import java.io.{ByteArrayOutputStream, File, FileInputStream}
import java.nio.file.{Files, Paths}
import scala.util.Try
class DisseminationCopyHandlerTest extends AnyFunSuite with BeforeAndAfter {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment