Unverified Commit f61b9c84 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

fix style issues


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 9dd10dc6
Pipeline #13102 failed with stages
in 3 minutes and 9 seconds
......@@ -55,12 +55,13 @@ object App extends scala.App with Logging {
settings.getAppSettings.getProperty("fedoraPassword")
)
val recordProcessor = new RecordProcessor(fileHandler, fCWrapper, settings.getAppSettings.getProperty("externalBaseUrl"))
val consumerPollTimeoutMs = 100
try {
logger.debug(s"Subscribing to topic ${settings.getInputTopic}")
consumer.subscribe(List(settings.getInputTopic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(100)).asScala
val records = consumer.poll(Duration.ofMillis(consumerPollTimeoutMs)).asScala
for (record <- records) {
// TODO: Filter for objectTypes: Only records, not institutions, recordSets
recordProcessor.process(record) match {
......
......@@ -25,6 +25,7 @@ import java.net.URI
import ch.memobase.models._
import org.memobase.fedora.{BinaryResource, FedoraClient, FedoraClientImpl}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
......@@ -82,9 +83,10 @@ object FedoraClientWrapper {
}
private def copyDataAndCloseResource(binaryResource: BinaryResource): Try[ByteArrayOutputStream] = {
val dataByteLength = 2097152
val outputStream: Try[ByteArrayOutputStream] = Try {
val oS = new ByteArrayOutputStream()
val data = new Array[Byte](2097152)
val data = new Array[Byte](dataByteLength)
Iterator.continually(binaryResource.getData.read(data))
.takeWhile(-1 !=)
.foreach(oS.write)
......
......@@ -20,6 +20,8 @@
package ch.memobase
import java.io.ByteArrayOutputStream
import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
......@@ -45,7 +47,15 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper
(for {
kafkaMsg <- BinaryResourceMetadata.build(record.value(), externalBaseUrl)
fileWithMetadata <- fedoraClientWrapper.fetchBinaryResource(kafkaMsg.filePath)
} yield (kafkaMsg.id, kafkaMsg.eventType, fileWithMetadata.fileType, fileWithMetadata.data) match {
} yield createProcessResult(kafkaMsg.id, kafkaMsg.eventType, fileWithMetadata.fileType, fileWithMetadata.data))
.recover {
case e: ResourceWithoutBinary => ProcessIgnore(record.key(), e.getMessage)
case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
}
}.get
private def createProcessResult(id: String, fedoraEvent: Event, mediaFileType: MediaFileType, data: ByteArrayOutputStream) =
(id, fedoraEvent, mediaFileType, data) match {
case (id, Create | Update, af: AudioFileType, data) =>
errorHandler(fileHandler.createAudioCopy(data, id, af), id, af, "Create/Update")
case (id, Delete, af: AudioFileType, _) =>
......@@ -60,9 +70,5 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler, fedoraClientWrapper
errorHandler(fileHandler.deleteImageCopy(id), id, i, "Delete")
case (id, event, ft, _) =>
ProcessFailure(id, ft, event.toString, new Exception)
}).recover {
case e: ResourceWithoutBinary => ProcessIgnore(record.key(), e.getMessage)
case e: Exception => ProcessFailure(record.key(), UnknownFileType, "", e)
}
}.get
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment