Commit 9148b6ad authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

MEMO-650: Create placeholders for all non-binary resources

As we get one Kafka message per record to ingest, we can create
the needed placeholders explicitly during ingest.
parent 7c61d02e
......@@ -21,6 +21,7 @@ import java.io.Closeable
import java.io.File
import java.io.StringWriter
import java.net.URI
import org.apache.jena.rdf.model.Model
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager
......@@ -32,7 +33,8 @@ import org.memobase.sftp.SftpClient
class Ingester(
private val producer: Producer,
private val sftpClient: SftpClient,
private val fedoraClient: FedoraClient
private val fedoraClient: FedoraClient,
private val externalBaseUrl: String
) : Closeable {
private val log = LogManager.getLogger("FedoraIngester")
......@@ -48,23 +50,58 @@ class Ingester(
private fun ingest(id: String, content: String) {
log.info("Ingest record $id.")
log.info("Content: $content")
val rdfHandler = RdfHandler(content)
val rdfHandler = RdfHandler(content, externalBaseUrl)
val recordOutput = StringWriter()
val recordPair = rdfHandler.getRecord()
recordPair.second.write(recordOutput, "NTRIPLES")
// FIXME: correct for ingest:
// 1 binaries
// 2 instantiations
// 3 records
val data = recordOutput.toString()
log.info("Ingesting record ${recordPair.first}.")
// create placeholders referenced resources:
val nonBinaryResources = rdfHandler.getReferencedNonBinaryResources()
nonBinaryResources.forEach { resource ->
log.info("Creating placeholder for resource $resource.")
fedoraClient.createPlaceholder(URI(resource))
log.info("Created placeholder for resource $resource.")
}
// ingest record, instantiations and binaries:
fedoraClient.createOrUpdateRdfResource(URI(recordPair.first), data, RdfContentTypes.NTRIPLES)
ingestInstantiations(rdfHandler.getInstantiations())
ingestBinaries(rdfHandler.getSftpLocators(), rdfHandler)
log.info("Ingested record ${recordPair.first}.")
}
// FIXME: extract into separate method
rdfHandler.getInstantiations().forEach { instantiationPair ->
private fun processRecord(record: ConsumerRecord<String, String>): Report {
if (record.value().contains("ERROR")) {
return Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
return try {
ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
log.error("Ingestion of record ${record.key()} failed: ${e.localizedMessage}.")
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
private fun ingestInstantiations(instantiations: List<Pair<String, Model>>) {
instantiations.forEach { instantiationPair ->
val instantiationOutput = StringWriter()
instantiationPair.second.write(instantiationOutput, "NTRIPLES")
try {
......@@ -77,9 +114,10 @@ class Ingester(
throw ex
}
}
}
// FIXME: extract into separate method
rdfHandler.getSftpLocators().forEach {
private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler) {
sftpLocators.forEach {
val digitalInstantiationUrl = it.first
it.second.let { path ->
if (path != null) {
......@@ -101,36 +139,8 @@ class Ingester(
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
if (record.value().contains("ERROR")) {
return Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
return try {
ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
log.error("Ingestion of record ${record.key()} failed: ${e.localizedMessage}.")
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
override fun close() {
sftpClient.close()
producer.close()
// TODO: the fedora client should probably be closeable as well?
// fedora.close()
}
}
......@@ -11,9 +11,10 @@ import org.memobase.rdf.EBUCORE
import org.memobase.rdf.RDF
import org.memobase.rdf.RICO
class RdfHandler(data: String) {
class RdfHandler(data: String, externalBaseUrl: String) {
private val log = LogManager.getLogger("RdfHandler")
private val model = ModelFactory.createDefaultModel()
private val externalBaseUrl = externalBaseUrl
init {
model.read(ByteArrayInputStream(data.toByteArray()), "", "NTRIPLES")
......@@ -43,6 +44,20 @@ class RdfHandler(data: String) {
return Pair(uri, resultModel)
}
private fun isInternalNonBinaryResource(uri: String): Boolean {
return uri.startsWith(externalBaseUrl) && !uri.endsWith("/binary")
}
fun getReferencedNonBinaryResources(): List<String> {
return model.listStatements().filterKeep { statement ->
statement.getObject().isURIResource
}.mapWith { statement ->
statement.getObject().asResource().uri
}.filterKeep { uri ->
isInternalNonBinaryResource(uri)
}.toList().distinct()
}
fun getInstantiations(): List<Pair<String, Model>> {
return model.listSubjectsWithProperty(RDF.type, RICO.Instantiation).mapWith { resource ->
val model = ModelFactory.createDefaultModel()
......
......@@ -58,7 +58,8 @@ class Service(fileName: String = "app.yml") {
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val sftp = SftpClient(settings.sftpSettings)
private val fedora = createFedoraClient(settings.appSettings)
val ingester = Ingester(producer, sftp, fedora)
val ingester = Ingester(producer, sftp, fedora,
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl"))
fun run() {
consumer.use { consumer ->
......
......@@ -31,7 +31,7 @@ class TestRdfHandler {
@Test
fun `test get record`() {
val rdfHandler = RdfHandler(readFile("input.nt"))
val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/")
val pair = rdfHandler.getRecord()
......@@ -47,9 +47,29 @@ class TestRdfHandler {
.isEqualTo("https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63")
}
@Test
fun `test get referenced non binary resources`() {
val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/")
val resources = rdfHandler.getReferencedNonBinaryResources()
resources.forEachIndexed { index, uri ->
assertThat(uri)
.isEqualTo(nonBinaryResourcesResult[index])
}
}
private val nonBinaryResourcesResult: List<String> = listOf(
"https://memobase.ch/instantiation/physical/AFZ-IB_Becker_Audiovisuals_63-0",
"https://memobase.ch/institution/AFZ",
"https://memobase.ch/recordSet/BECKER",
"https://memobase.ch/institution/Memoriav",
"https://memobase.ch/instantiation/digital/AFZ-IB_Becker_Audiovisuals_63-1",
"https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63"
)
@Test
fun `test get instantiations`() {
val rdfHandler = RdfHandler(readFile("input.nt"))
val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/")
val list = rdfHandler.getInstantiations()
list.forEachIndexed { index, pair ->
......@@ -72,7 +92,7 @@ class TestRdfHandler {
@Test
fun `test get sftp locators`() {
val rdfHandler = RdfHandler(readFile("inputSftp.nt"))
val rdfHandler = RdfHandler(readFile("inputSftp.nt"), "https://memobase.ch/")
val pairs = rdfHandler.getSftpLocators()
pairs.forEachIndexed { index, pair ->
......@@ -88,7 +108,7 @@ class TestRdfHandler {
@Test
fun `test replace sftp locators`() {
val rdfHandler = RdfHandler(readFile("inputSftp.nt"))
val rdfHandler = RdfHandler(readFile("inputSftp.nt"), "https://memobase.ch/")
val list = rdfHandler.getInstantiations()
list.forEachIndexed { index, pair ->
......@@ -106,7 +126,7 @@ class TestRdfHandler {
@Test
fun `test mime type extraction for binaries`() {
val rdfHandler = RdfHandler(readFile("input.nt"))
val rdfHandler = RdfHandler(readFile("input.nt"), "https://memobase.ch/")
assertThat(rdfHandler.getMimeType().first().second).isEqualTo("video/mpeg")
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment