Commit 92f41e83 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

first working version (hopefully)

parent 3540beca
...@@ -42,12 +42,12 @@ dependencies { ...@@ -42,12 +42,12 @@ dependencies {
implementation "org.apache.kafka:kafka-clients:${kafkaV}" implementation "org.apache.kafka:kafka-clients:${kafkaV}"
implementation 'org.fcrepo.client:fcrepo-java-client:0.4.0' //implementation 'org.fcrepo.client:fcrepo-java-client:0.4.0'
implementation 'org.apache.jena:apache-jena:3.14.0' implementation 'org.apache.jena:apache-jena:3.14.0'
implementation 'org.memobase:memobase-service-utilities:1.5.0' implementation 'org.memobase:memobase-service-utilities:1.5.0'
implementation 'org.memobase:fedora-client:0.3.0' implementation 'org.memobase:fedora-client:0.4.0'
// KOTLIN IMPORTS // KOTLIN IMPORTS
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
......
...@@ -22,12 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecords ...@@ -22,12 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient import org.memobase.fedora.FedoraClient
import org.memobase.fedora.RdfContentTypes
import org.memobase.sftp.SftpClient import org.memobase.sftp.SftpClient
import java.io.Closeable import java.io.Closeable
import java.io.File import java.io.File
import java.io.StringWriter import java.io.StringWriter
import java.net.URI
class Ingester(private val producer: Producer, private val sftpClient: SftpClient, private val fedoraClient: FedoraClient): Closeable { class Ingester(private val producer: Producer, private val sftpClient: SftpClient, private val fedoraClient: FedoraClient) : Closeable {
private val log = LogManager.getLogger("FedoraIngester") private val log = LogManager.getLogger("FedoraIngester")
...@@ -47,24 +49,47 @@ class Ingester(private val producer: Producer, private val sftpClient: SftpClien ...@@ -47,24 +49,47 @@ class Ingester(private val producer: Producer, private val sftpClient: SftpClien
val recordOutput = StringWriter() val recordOutput = StringWriter()
val recordPair = rdfHandler.getRecord() val recordPair = rdfHandler.getRecord()
recordPair.second.write(recordOutput) recordPair.second.write(recordOutput)
try {
log.info("Ingested record ${recordPair.first}.") fedoraClient.createOrUpdateRdfResource(URI(recordPair.first), recordOutput.toString(), RdfContentTypes.NTRIPLES)
log.info("Ingested record ${recordPair.first}.")
producer.sendReport(
Report(recordPair.first, ReportStatus.success, "Ingested record in fedora."))
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of record ${recordPair.first} failed: ${ex.localizedMessage}.")
producer.sendReport(
Report(recordPair.first, ReportStatus.failure, ex.localizedMessage))
}
rdfHandler.getInstantiations().forEach { instantiationPair -> rdfHandler.getInstantiations().forEach { instantiationPair ->
val instantiationOutput = StringWriter() val instantiationOutput = StringWriter()
instantiationPair.second.write(instantiationOutput) instantiationPair.second.write(instantiationOutput)
// TODO try {
log.info("Ingested instantiation ${instantiationPair.first}.") fedoraClient.createOrUpdateRdfResource(URI(instantiationPair.first), instantiationOutput.toString(), RdfContentTypes.NTRIPLES)
log.info("Ingested instantiation ${instantiationPair.first}.")
producer.sendReport(
Report(instantiationPair.first, ReportStatus.success, "Ingested an instantiation in fedora."))
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of instantiation ${instantiationPair.first} failed: ${ex.localizedMessage}.")
producer.sendReport(
Report(instantiationPair.first, ReportStatus.failure, ex.localizedMessage))
}
} }
rdfHandler.getSftpLocators().forEach { rdfHandler.getSftpLocators().forEach {
it.second.let { path -> it.second.let { path ->
if (path != null) { if (path != null) {
sftpClient.open(File(path)).use { stream -> sftpClient.open(File(path)).use { stream ->
val bytes = stream.readBytes() val binaryUri = "${it.first}/binary"
// TODO try {
fedoraClient.createOrUpdateBinaryResource(URI(binaryUri), stream, "image/jpeg")
log.info("Ingested binary file from $path to uri ${it.first}.") log.info("Ingested binary $binaryUri.")
producer.sendReport(
Report(binaryUri, ReportStatus.success, "Ingested a binary in fedora."))
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of binary $binaryUri failed: ${ex.localizedMessage}.")
producer.sendReport(
Report(binaryUri, ReportStatus.failure, ex.localizedMessage))
}
} }
} }
} }
...@@ -75,15 +100,15 @@ class Ingester(private val producer: Producer, private val sftpClient: SftpClien ...@@ -75,15 +100,15 @@ class Ingester(private val producer: Producer, private val sftpClient: SftpClien
return try { return try {
ingest(record.key(), record.value()) ingest(record.key(), record.value())
Report( Report(
id = record.key(), id = record.key(),
status = ReportStatus.success, status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key()) message = ReportMessages.ingestedRecord(record.key())
) )
} catch (e: FcrepoOperationFailedException) { } catch (e: FcrepoOperationFailedException) {
Report( Report(
id = record.key(), id = record.key(),
status = ReportStatus.failure, status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key()) message = ReportMessages.ingestFailed(record.key())
) )
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment