Commit 1e70cc5c authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Merge commit

parents 3135a5cf bc87c1be
Pipeline #16550 passed with stages
in 6 minutes and 19 seconds
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.memobase package org.memobase
import com.beust.klaxon.Klaxon
import java.io.File import java.io.File
import java.io.FileInputStream import java.io.FileInputStream
import java.time.Duration import java.time.Duration
...@@ -35,6 +36,7 @@ import org.apache.kafka.common.serialization.StringDeserializer ...@@ -35,6 +36,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
...@@ -122,7 +124,9 @@ class ServiceTest { ...@@ -122,7 +124,9 @@ class ServiceTest {
assertThat(totalConsumerRecords) assertThat(totalConsumerRecords)
.size().isEqualTo(expectedRecordCount) .size().isEqualTo(expectedRecordCount)
assertThat(totalConsumerRecords[0].value()).isEqualTo(params.expectedIngestReports[0].toJson()) val receivedReport = Klaxon().parse<Report>(totalConsumerRecords[0].value())
assertNotNull(receivedReport)
assertThat(receivedReport).isEqualTo(params.expectedIngestReports[0])
} }
private fun kafkaTests() = Stream.of( private fun kafkaTests() = Stream.of(
......
...@@ -90,10 +90,8 @@ class Ingester( ...@@ -90,10 +90,8 @@ class Ingester(
it.second.let { path -> it.second.let { path ->
if (path != null) { if (path != null) {
sftpClient.open(File(path)).use { stream -> sftpClient.open(File(path)).use { stream ->
val binaryUri = "${it.first}/${Service.BINARY_FILE_URI_PATH}" val binaryUri = "$digitalInstantiationUrl/${Service.BINARY_FILE_URI_PATH}"
val mimeType = rdfHandler.getMimeType().first { mT -> val mimeType = rdfHandler.getMimeType(digitalInstantiationUrl)
mT.first == digitalInstantiationUrl
}.second
log.info("Ingesting binary $binaryUri with mime type $mimeType.") log.info("Ingesting binary $binaryUri with mime type $mimeType.")
transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType) transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
} }
......
...@@ -9,6 +9,7 @@ import org.apache.jena.rdf.model.impl.StatementImpl ...@@ -9,6 +9,7 @@ import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.jena.riot.Lang import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.log4j.LogManager import org.apache.log4j.LogManager
import org.memobase.exceptions.MissingMimeTypeException
import org.memobase.rdf.EBUCORE import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RDF import org.memobase.rdf.RDF
import org.memobase.rdf.RICO import org.memobase.rdf.RICO
...@@ -109,11 +110,14 @@ class RdfHandler(data: String, private val externalBaseUrl: String) { ...@@ -109,11 +110,14 @@ class RdfHandler(data: String, private val externalBaseUrl: String) {
}.toList() }.toList()
} }
fun getMimeType(): List<Pair<String, String?>> { fun getMimeType(uri: String): String {
return model.listSubjectsWithProperty(RICO.type, Service.DIGITAL_OBJECT_TYPE).filterKeep { val mimeTypes = model.getResource(uri).listProperties(EBUCORE.hasMimeType).mapWith {
it.hasProperty(EBUCORE.hasMimeType) it.`object`.asLiteral().string
}.mapWith {
Pair(it.uri, it.getProperty(EBUCORE.hasMimeType).`object`.asLiteral().string)
}.toList() }.toList()
if (mimeTypes.size == 1) {
return mimeTypes[0]
} else {
throw MissingMimeTypeException("No MimeType found for resource $uri")
}
} }
} }
...@@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord ...@@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
import org.fcrepo.client.FcrepoOperationFailedException import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.exceptions.MissingMimeTypeException
import org.memobase.exceptions.SftpClientException
import org.memobase.fedora.FedoraClient import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
...@@ -132,12 +134,33 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -132,12 +134,33 @@ class Service(fileName: String = "app.yml") : Closeable {
status = ReportStatus.success, status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key()) message = ReportMessages.ingestedRecord(record.key())
) )
} catch (ex: FcrepoOperationFailedException) {
log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
Report(
id = record.key(),
status = ReportStatus.failure,
message = "Fedora Exception: ${ex.localizedMessage}"
)
} catch (ex: MissingMimeTypeException) {
log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
Report(
id = record.key(),
status = ReportStatus.failure,
message = "Missing MimeType Exception: ${ex.localizedMessage}"
)
} catch (ex: SftpClientException) {
log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Report(
id = record.key(),
status = ReportStatus.failure,
message = "SFTP Exception: ${ex.localizedMessage}"
)
} catch (ex: Exception) { } catch (ex: Exception) {
log.error("Ingestion of record ${record.key()} failed: ${ex.localizedMessage}", ex) log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Report( Report(
id = record.key(), id = record.key(),
status = ReportStatus.failure, status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key()) message = "Unknown Exception: ${ex.localizedMessage}"
) )
} }
} }
......
package org.memobase.exceptions
class MissingMimeTypeException(message: String) : Exception(message)
...@@ -127,7 +127,8 @@ class TestRdfHandler { ...@@ -127,7 +127,8 @@ class TestRdfHandler {
@Test @Test
fun `test mime type extraction for binaries`() { fun `test mime type extraction for binaries`() {
val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/") val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/")
assertThat(rdfHandler.getMimeType().first().second).isEqualTo("video/mpeg") val mimeType = rdfHandler.getMimeType("https://memobase.ch/instantiation/digital/AFZ-IB_Becker_Audiovisuals_63-1")
assertThat(mimeType).isEqualTo("video/mpeg")
} }
private val uris2 = listOf( private val uris2 = listOf(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment