Commit 5432c599 authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

MEMO-636: Use transaction during ingest

parent a0c44f77
Pipeline #13804 passed with stages
in 5 minutes and 47 seconds
......@@ -46,7 +46,7 @@ dependencies {
implementation 'org.apache.jena:apache-jena:3.14.0', excludeSlf4J
implementation 'org.memobase:memobase-service-utilities:1.10.0', excludeSlf4J
implementation 'org.memobase:fedora-client:0.5.0', excludeSlf4J
implementation 'org.memobase:fedora-client:0.6.0', excludeSlf4J
// KOTLIN IMPORTS
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
......
......@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraTransactionClient
import org.memobase.fedora.RdfContentTypes
import org.memobase.sftp.SftpClient
......@@ -57,21 +58,25 @@ class Ingester(
recordPair.second.write(recordOutput, "NTRIPLES")
val data = recordOutput.toString()
log.info("Ingesting record ${recordPair.first}.")
// create placeholders referenced resources:
val nonBinaryResources = rdfHandler.getReferencedNonBinaryResources()
nonBinaryResources.forEach { resource ->
log.info("Creating placeholder for resource $resource.")
fedoraClient.createPlaceholder(URI(resource))
log.info("Created placeholder for resource $resource.")
}
fedoraClient.startTransaction().use { transaction ->
log.info("Ingesting record ${recordPair.first}.")
// create placeholders referenced resources:
val nonBinaryResources = rdfHandler.getReferencedNonBinaryResources()
nonBinaryResources.forEach { resource ->
log.info("Creating placeholder for resource $resource.")
transaction.createPlaceholder(URI(resource))
log.info("Created placeholder for resource $resource.")
}
// ingest record, instantiations and binaries:
fedoraClient.createOrUpdateRdfResource(URI(recordPair.first), data, RdfContentTypes.NTRIPLES)
ingestInstantiations(rdfHandler.getInstantiations())
ingestBinaries(rdfHandler.getSftpLocators(), rdfHandler)
log.info("Ingested record ${recordPair.first}.")
// ingest record, instantiations and binaries:
transaction.createOrUpdateRdfResource(URI(recordPair.first), data, RdfContentTypes.NTRIPLES)
ingestInstantiations(rdfHandler.getInstantiations(), transaction)
ingestBinaries(rdfHandler.getSftpLocators(), rdfHandler, transaction)
transaction.commit()
log.info("Ingested record ${recordPair.first}.")
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
......@@ -100,14 +105,14 @@ class Ingester(
}
}
private fun ingestInstantiations(instantiations: List<Pair<String, Model>>) {
private fun ingestInstantiations(instantiations: List<Pair<String, Model>>, transaction: FedoraTransactionClient) {
instantiations.forEach { instantiationPair ->
val instantiationOutput = StringWriter()
instantiationPair.second.write(instantiationOutput, "NTRIPLES")
try {
val instantiationData = instantiationOutput.toString()
log.info("Ingesting instantiation ${instantiationPair.first}.")
fedoraClient.createOrUpdateRdfResource(URI(instantiationPair.first), instantiationData, RdfContentTypes.NTRIPLES)
transaction.createOrUpdateRdfResource(URI(instantiationPair.first), instantiationData, RdfContentTypes.NTRIPLES)
log.info("Ingested instantiation ${instantiationPair.first}.")
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of instantiation ${instantiationPair.first} failed: ${ex.localizedMessage}.")
......@@ -116,7 +121,7 @@ class Ingester(
}
}
private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler) {
private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler, transaction: FedoraTransactionClient) {
sftpLocators.forEach {
val digitalInstantiationUrl = it.first
it.second.let { path ->
......@@ -128,7 +133,7 @@ class Ingester(
mT.first == digitalInstantiationUrl
}.second
log.info("Ingesting binary $binaryUri with mime type $mimeType.")
fedoraClient.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of binary $binaryUri failed: ${ex.localizedMessage}.")
throw ex
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment