Commit d1851b88 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Expanded rdf handler to return uri as well.

parent 7809a718
Pipeline #11651 failed with stages
in 1 minute and 37 seconds
......@@ -17,41 +17,58 @@
*/
package org.memobase
// import java.io.File
// import java.io.InputStream
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import java.net.URI
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.sftp.SftpClient
import java.io.Closeable
import java.io.File
import java.io.StringWriter
class Ingester(private val producer: Producer, private val sftpClient: SftpClient, private val fedoraClient: FedoraClient) {
class Ingester(private val producer: Producer, private val sftpClient: SftpClient, private val fedoraClient: FedoraClient): Closeable {
private val log = LogManager.getLogger("IngestService")
private val log = LogManager.getLogger("FedoraIngester")
fun processRecords(recordsToIngest: ConsumerRecords<String, String>): MutableList<Report> {
val reports = mutableListOf<Report>()
fun processRecords(recordsToIngest: ConsumerRecords<String, String>) {
for (record in recordsToIngest) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
reports.add(ingestReport)
}
return reports
}
@Throws(FcrepoOperationFailedException::class)
private fun ingest(id: String, content: String) {
log.info("Ingest record $id.")
log.info("Content: $content")
// fedoraClient.createOrUpdateRdfResource()
// if (rdf.hasFileReference()) {
// fedoraClient.createOrUpdateBinaryResource()
// }
throw FcrepoOperationFailedException(
URI("http://mb-fed1.memobase.unibas.ch:8080/fcrepo/rest"), 501, "Not Implemented."
)
val rdfHandler = RdfHandler(content)
val recordOutput = StringWriter()
val recordPair = rdfHandler.getRecord()
recordPair.second.write(recordOutput)
log.info("Ingested record ${recordPair.first}.")
rdfHandler.getInstantiations().forEach { instantiationPair ->
val instantiationOutput = StringWriter()
instantiationPair.second.write(instantiationOutput)
// TODO
log.info("Ingested instantiation ${instantiationPair.first}.")
}
rdfHandler.getSftpLocators().forEach {
it.second.let { path ->
if (path != null) {
sftpClient.open(File(path)).use { stream ->
val bytes = stream.readAllBytes()
// TODO
log.info("Ingested binary file from $path to uri ${it.first}.")
}
}
}
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
......@@ -71,11 +88,10 @@ class Ingester(private val producer: Producer, private val sftpClient: SftpClien
}
}
// private fun parseRdf(rdf: String) {
// "<https://www.ica.org/standards/RiC/ontology#identifier>"
// }
// private fun openSftpFile(file: File): InputStream {
// sftpClient.open(file)
// }
override fun close() {
sftpClient.close()
producer.close()
// TODO: the fedora client should probably be closeable as well?
// fedora.close()
}
}
......@@ -15,8 +15,9 @@ class RdfHandler(data: String) {
model.read(ByteArrayInputStream(data.toByteArray()), "", "NTRIPLES")
}
fun getRecord(): Model {
fun getRecord(): Pair<String, Model> {
val resultModel = ModelFactory.createDefaultModel()
var uri = ""
model.listSubjectsWithProperty(RDF.type, RICO.Record).forEach { resource ->
resource.listProperties().forEach { statement ->
if (statement.`object`.isAnon) {
......@@ -33,11 +34,12 @@ class RdfHandler(data: String) {
// Adding record statements
resultModel.add(statement)
}
uri = resource.uri
}
return resultModel
return Pair(uri, resultModel)
}
fun getInstantiations(): List<Model> {
fun getInstantiations(): List<Pair<String, Model>> {
return model.listSubjectsWithProperty(RDF.type, RICO.Instantiation).mapWith { resource ->
val model = ModelFactory.createDefaultModel()
resource.listProperties().forEach { statement ->
......@@ -48,7 +50,7 @@ class RdfHandler(data: String) {
}
model.add(statement)
}
model
Pair(resource.uri, model)
}.toList()
}
......
......@@ -37,40 +37,41 @@ class Service(fileName: String = "app.yml") {
fun createFedoraClient(appSettings: Properties): FedoraClient {
return FedoraClientImpl.builder()
// FIXME: update to new version supporting method properties()
//.properties(appSettings, FEDORA_PROPERTIES_PREFIX)
.build()
// FIXME: update to new version supporting method properties()
//.properties(appSettings, FEDORA_PROPERTIES_PREFIX)
.build()
}
}
val settings = SettingsLoader(
listOf(
"$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.username",
"$FEDORA_PROPERTIES_PREFIX.password"
),
fileName,
useProducerConfig = true,
useConsumerConfig = true,
readSftpSettings = true
listOf(
"$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.username",
"$FEDORA_PROPERTIES_PREFIX.password"
),
fileName,
useProducerConfig = true,
useConsumerConfig = true,
readSftpSettings = true
)
private val log: Logger = LogManager.getLogger("FedoraIngestService")
private val consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
private val ingester = Ingester(
Producer(settings.kafkaProducerSettings, settings.outputTopic),
SftpClient(settings.sftpSettings),
createFedoraClient(settings.appSettings)
)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val sftp = SftpClient(settings.sftpSettings)
private val fedora = createFedoraClient(settings.appSettings)
private val ingester = Ingester(producer, sftp, fedora)
fun run() {
consumer.use { consumer ->
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
ingester.processRecords(recordsToIngest)
ingester.use {
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
it.processRecords(recordsToIngest)
}
}
}
}
......
......@@ -42,32 +42,43 @@ class TestRdfHandler {
fun `test get record`() {
val rdfHandler = RdfHandler(readFile("input.nt"))
val model = rdfHandler.getRecord()
val pair = rdfHandler.getRecord()
val out = StringWriter()
RDFDataMgr.write(out, model, Lang.NTRIPLES)
RDFDataMgr.write(out, pair.second, Lang.NTRIPLES)
val sortedOut = sort(out.toString().split("\n")).trim()
assertThat(sortedOut)
.isEqualTo(sort(readFile("recordOutput.nt").split("\n")).trim())
assertThat(pair.first)
.isEqualTo("https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63")
}
@Test
fun `test get instantiations`() {
val rdfHandler = RdfHandler(readFile("input.nt"))
val models = rdfHandler.getInstantiations()
val list = rdfHandler.getInstantiations()
models.forEachIndexed { index, model ->
list.forEachIndexed { index, pair ->
val out = StringWriter()
RDFDataMgr.write(out, model, Lang.NTRIPLES)
RDFDataMgr.write(out, pair.second, Lang.NTRIPLES)
val sortedOut = sort(out.toString().split("\n")).trim()
assertThat(sortedOut)
.isEqualTo(sort(readFile("instantiationOutput$index.nt").split("\n")).trim())
assertThat(pair.first)
.isEqualTo(uris[index])
}
}
private val uris = listOf(
"https://memobase.ch/instantiation/digital/AFZ-IB_Becker_Audiovisuals_63-1",
"https://memobase.ch/instantiation/physical/AFZ-IB_Becker_Audiovisuals_63-0"
)
@Test
fun `test get sftp locators`() {
val rdfHandler = RdfHandler(readFile("inputSftp.nt"))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment