Commit bc87c1be authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

Merge branch 'MEMO-669-improve-mimetype-handling' into 'master'

Memo 669: improve mimetype handling

See merge request !11
parents 10f40f28 08569f9e
Pipeline #16477 passed with stages
in 5 minutes and 42 seconds
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.memobase package org.memobase
import com.beust.klaxon.Klaxon
import java.io.File import java.io.File
import java.io.FileInputStream import java.io.FileInputStream
import java.time.Duration import java.time.Duration
...@@ -35,6 +36,7 @@ import org.apache.kafka.common.serialization.StringDeserializer ...@@ -35,6 +36,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
...@@ -122,7 +124,9 @@ class ServiceTest { ...@@ -122,7 +124,9 @@ class ServiceTest {
assertThat(totalConsumerRecords) assertThat(totalConsumerRecords)
.size().isEqualTo(expectedRecordCount) .size().isEqualTo(expectedRecordCount)
assertThat(totalConsumerRecords[0].value()).isEqualTo(params.expectedIngestReports[0].toJson()) val receivedReport = Klaxon().parse<Report>(totalConsumerRecords[0].value())
assertNotNull(receivedReport)
assertThat(receivedReport).isEqualTo(params.expectedIngestReports[0])
} }
private fun kafkaTests() = Stream.of( private fun kafkaTests() = Stream.of(
......
...@@ -90,10 +90,8 @@ class Ingester( ...@@ -90,10 +90,8 @@ class Ingester(
it.second.let { path -> it.second.let { path ->
if (path != null) { if (path != null) {
sftpClient.open(File(path)).use { stream -> sftpClient.open(File(path)).use { stream ->
val binaryUri = "${it.first}/${Service.BINARY_FILE_URI_PATH}" val binaryUri = "$digitalInstantiationUrl/${Service.BINARY_FILE_URI_PATH}"
val mimeType = rdfHandler.getMimeType().first { mT -> val mimeType = rdfHandler.getMimeType(digitalInstantiationUrl)
mT.first == digitalInstantiationUrl
}.second
log.info("Ingesting binary $binaryUri with mime type $mimeType.") log.info("Ingesting binary $binaryUri with mime type $mimeType.")
transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType) transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
} }
......
...@@ -9,6 +9,7 @@ import org.apache.jena.rdf.model.impl.StatementImpl ...@@ -9,6 +9,7 @@ import org.apache.jena.rdf.model.impl.StatementImpl
import org.apache.jena.riot.Lang import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr import org.apache.jena.riot.RDFDataMgr
import org.apache.log4j.LogManager import org.apache.log4j.LogManager
import org.memobase.exceptions.MissingMimeTypeException
import org.memobase.rdf.EBUCORE import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RDF import org.memobase.rdf.RDF
import org.memobase.rdf.RICO import org.memobase.rdf.RICO
...@@ -109,11 +110,14 @@ class RdfHandler(data: String, private val externalBaseUrl: String) { ...@@ -109,11 +110,14 @@ class RdfHandler(data: String, private val externalBaseUrl: String) {
}.toList() }.toList()
} }
fun getMimeType(): List<Pair<String, String?>> { fun getMimeType(uri: String): String {
return model.listSubjectsWithProperty(RICO.type, Service.DIGITAL_OBJECT_TYPE).filterKeep { val mimeTypes = model.getResource(uri).listProperties(EBUCORE.hasMimeType).mapWith {
it.hasProperty(EBUCORE.hasMimeType) it.`object`.asLiteral().string
}.mapWith {
Pair(it.uri, it.getProperty(EBUCORE.hasMimeType).`object`.asLiteral().string)
}.toList() }.toList()
if (mimeTypes.size == 1) {
return mimeTypes[0]
} else {
throw MissingMimeTypeException("No MimeType found for resource $uri")
}
} }
} }
...@@ -24,6 +24,9 @@ import java.util.Properties ...@@ -24,6 +24,9 @@ import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.exceptions.MissingMimeTypeException
import org.memobase.exceptions.SftpClientException
import org.memobase.fedora.FedoraClient import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
...@@ -80,32 +83,57 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -80,32 +83,57 @@ class Service(fileName: String = "app.yml") : Closeable {
fun processRecords() { fun processRecords() {
for (record in consumer.fetchRecords()) { for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record) processRecord(record)
producer.sendReport(record.headers(), ingestReport)
} }
} }
private fun processRecord(record: ConsumerRecord<String, String>): Report { private fun processRecord(record: ConsumerRecord<String, String>) {
val ingester = Ingester( val ingester = Ingester(
settings.sftpSettings, settings.sftpSettings,
createFedoraClient(settings.appSettings), createFedoraClient(settings.appSettings),
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl") settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
) )
return try { try {
ingester.ingest(record.key(), record.value()) ingester.ingest(record.key(), record.value())
Report( val report = Report(
id = record.key(), id = record.key(),
status = ReportStatus.success, status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key()) message = ReportMessages.ingestedRecord(record.key())
) )
} catch (ex: Exception) { producer.sendReport(record.headers(), report)
log.error("Ingestion of record ${record.key()} failed: ${ex.localizedMessage}", ex) } catch (ex: FcrepoOperationFailedException) {
Report( log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(), id = record.key(),
status = ReportStatus.failure, status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key()) message = "Fedora Exception: ${ex.localizedMessage}"
) )
producer.sendReport(record.headers(), report)
} catch (ex: MissingMimeTypeException) {
log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(),
status = ReportStatus.failure,
message = "Missing MimeType Exception: ${ex.localizedMessage}"
)
producer.sendReport(record.headers(), report)
} catch (ex: SftpClientException) {
log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(),
status = ReportStatus.failure,
message = "SFTP Exception: ${ex.localizedMessage}"
)
producer.sendReport(record.headers(), report)
} catch (ex: Exception) {
log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(),
status = ReportStatus.failure,
message = "Unknown Exception: ${ex.localizedMessage}"
)
producer.sendReport(record.headers(), report)
} }
} }
......
package org.memobase.exceptions
class MissingMimeTypeException(message: String) : Exception(message)
...@@ -127,7 +127,8 @@ class TestRdfHandler { ...@@ -127,7 +127,8 @@ class TestRdfHandler {
@Test @Test
fun `test mime type extraction for binaries`() { fun `test mime type extraction for binaries`() {
val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/") val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/")
assertThat(rdfHandler.getMimeType().first().second).isEqualTo("video/mpeg") val mimeType = rdfHandler.getMimeType("https://memobase.ch/instantiation/digital/AFZ-IB_Becker_Audiovisuals_63-1")
assertThat(mimeType).isEqualTo("video/mpeg")
} }
private val uris2 = listOf( private val uris2 = listOf(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment