Unverified Commit d1727f75 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

reconnect if connection to media file distributor fails

parent 68aa9315
Pipeline #23535 passed with stages
in 8 minutes and 19 seconds
......@@ -54,6 +54,10 @@ spec:
value: "/data"
- name: DISTRIBUTOR_URL
value: "http://mb-wf2.memobase.unibas.ch:3000"
- name: CONNECTION_RETRIES_AFTER_MS
value: "10000"
- name: CONNECTION_MAX_RETRIES
value: "3"
restartPolicy: Always
volumes:
......
......@@ -3,6 +3,8 @@ app:
externalBaseUrl: ${EXTERNAL_BASE_URL:?system}
mediaFolderRootPath: ${MEDIA_FOLDER_ROOT_PATH:?system}
distributorUrl: ${DISTRIBUTOR_URL:?system}
connectionMaxRetries: ${CONNECTION_MAX_RETRIES:?system}
connectionRetryAfterMs: ${CONNECTION_RETRY_AFTER_MS:?system}
kafka:
consumer:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:?system}
......
......@@ -23,8 +23,8 @@ package ch.memobase
import ch.memobase.models._
import org.apache.kafka.clients.consumer.ConsumerRecord
import java.io.ByteArrayOutputStream
import java.net.URL
import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{ConnectException, URL}
import java.util.Properties
import scala.util.{Failure, Success, Try}
......@@ -44,6 +44,8 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
val rootPath: String = appSettings.getProperty("mediaFolderRootPath")
val distributorUrl: String = appSettings.getProperty("distributorUrl")
val maxRetries: Int = appSettings.getProperty("connectionMaxRetries").toInt
val retryAfter: Int = appSettings.getProperty("connectionRetryAfterMs").toInt
def process(record: ConsumerRecord[String, String]): List[ProcessOutcome] = {
BinaryResourceMetadata.build(record.value(), appSettings.getProperty("externalBaseUrl"), distributorUrl) flatMap {
......@@ -107,11 +109,19 @@ class RecordProcessor(fileHandler: DisseminationCopyHandler,
*/
private def fetchBinaryResource(url: String): Try[ByteArrayOutputStream] = Try {
val baos = new ByteArrayOutputStream()
val u = new URL(url)
val inputStream = u.openStream()
val inputStream = getStream(url)
Iterator.continually(inputStream.read)
.takeWhile(-1 !=)
.foreach(baos.write)
baos
}
private def getStream(url: String, retries: Int = 0): InputStream = Try {
val u = new URL(url)
u.openStream
} recover {
case _: ConnectException if retries <= maxRetries =>
Thread.sleep(retryAfter)
getStream(url, retries + 1)
} get
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment