Commit 5ada2f49 authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

MEMO-733: Refactor Service and Ingester to improve reporting

parent 8e247802
......@@ -106,25 +106,20 @@ class ServiceTest {
val expectedRecordCount = params.expectedIngestReports.size
val service = Service("test${params.count}.yml")
var inputMessages = service.consumer.fetchRecords()
while (inputMessages.isEmpty) {
inputMessages = service.consumer.fetchRecords()
}
service.ingester.processRecords(inputMessages)
val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
var result = reportConsumer.poll(Duration.ofMillis(1000))
while (totalConsumerRecords.size != expectedRecordCount) {
service.processRecords()
var result = reportConsumer.poll(Duration.ofMillis(10))
if (result.count() > 0) {
totalConsumerRecords.addAll(result.asIterable())
}
log.error(result.count())
result = reportConsumer.poll(Duration.ofMillis(10))
}
assertThat(totalConsumerRecords)
.size().isEqualTo(expectedRecordCount)
assertThat(totalConsumerRecords[0].value()).isEqualTo(params.expectedIngestReports[0].toJson())
}
private fun kafkaTests() = Stream.of(
......@@ -135,9 +130,9 @@ class ServiceTest {
),
listOf(
Report(
id = "AFZ-IB_Becker_Audiovisuals_63",
id = "https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63",
status = "SUCCESS",
message = "Ingested resource AFZ-IB_Becker_Audiovisuals_63."
message = "Ingested resource https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63."
)
)
)
......
......@@ -17,38 +17,28 @@
*/
package org.memobase
import java.io.Closeable
import java.io.File
import java.io.StringWriter
import java.net.URI
import org.apache.jena.rdf.model.Model
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraTransactionClient
import org.memobase.fedora.RdfContentTypes
import org.memobase.settings.SftpSettings
import org.memobase.sftp.SftpClient
class Ingester(
private val producer: Producer,
private val sftpClient: SftpClient,
private val sftpSettings: SftpSettings,
private val fedoraClient: FedoraClient,
private val externalBaseUrl: String
) : Closeable {
) {
private val log = LogManager.getLogger("FedoraIngester")
fun processRecords(recordsToIngest: ConsumerRecords<String, String>) {
for (record in recordsToIngest) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
}
}
@Throws(FcrepoOperationFailedException::class)
private fun ingest(id: String, content: String) {
fun ingest(id: String, content: String) {
log.info("Ingest record $id.")
log.info("Content: $content")
val rdfHandler = RdfHandler(content, externalBaseUrl)
......@@ -73,38 +63,15 @@ class Ingester(
// ingest record, instantiations and binaries:
transaction.createOrUpdateRdfResource(URI(recordPair.first), data, RdfContentTypes.NTRIPLES)
ingestInstantiations(rdfHandler.getInstantiations(), transaction)
ingestBinaries(rdfHandler.getSftpLocators(), rdfHandler, transaction)
val sftpLocators = rdfHandler.getSftpLocators()
if (sftpLocators.size > 0) {
ingestBinaries(sftpLocators, rdfHandler, transaction)
}
transaction.commit()
log.info("Ingested record ${recordPair.first}.")
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
if (record.value().contains("ERROR")) {
return Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
return try {
ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
log.error("Ingestion of record ${record.key()} failed: ${e.localizedMessage}.")
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
private fun ingestInstantiations(instantiations: List<Pair<String, Model>>, transaction: FedoraTransactionClient) {
instantiations.forEach { instantiationPair ->
val instantiationOutput = StringWriter()
......@@ -122,30 +89,28 @@ class Ingester(
}
private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler, transaction: FedoraTransactionClient) {
sftpLocators.forEach {
val digitalInstantiationUrl = it.first
it.second.let { path ->
if (path != null) {
sftpClient.open(File(path)).use { stream ->
val binaryUri = "${it.first}/binary"
try {
val mimeType = rdfHandler.getMimeType().first { mT ->
mT.first == digitalInstantiationUrl
}.second
log.info("Ingesting binary $binaryUri with mime type $mimeType.")
transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of binary $binaryUri failed: ${ex.localizedMessage}.")
throw ex
SftpClient(sftpSettings).use { sftpClient ->
log.info("Connected to sFTP server.")
sftpLocators.forEach {
val digitalInstantiationUrl = it.first
it.second.let { path ->
if (path != null) {
sftpClient.open(File(path)).use { stream ->
val binaryUri = "${it.first}/binary"
try {
val mimeType = rdfHandler.getMimeType().first { mT ->
mT.first == digitalInstantiationUrl
}.second
log.info("Ingesting binary $binaryUri with mime type $mimeType.")
transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of binary $binaryUri failed: ${ex.localizedMessage}.")
throw ex
}
}
}
}
}
}
}
override fun close() {
sftpClient.close()
producer.close()
}
}
......@@ -18,16 +18,17 @@
package org.memobase
// import org.bouncycastle.cms.RecipientId.password
import java.io.Closeable
import java.lang.Exception
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class Service(fileName: String = "app.yml") {
class Service(fileName: String = "app.yml") : Closeable {
companion object {
const val FEDORA_PROPERTIES_PREFIX = "fedora"
......@@ -53,23 +54,63 @@ class Service(fileName: String = "app.yml") {
)
private val log: Logger = LogManager.getLogger("FedoraIngestService")
private var consumer: Consumer
private var producer: Producer
val consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val sftp = SftpClient(settings.sftpSettings)
private val fedora = createFedoraClient(settings.appSettings)
val ingester = Ingester(producer, sftp, fedora,
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl"))
init {
consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
log.info("Connected to Kafka.")
}
fun run() {
consumer.use { consumer ->
ingester.use {
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
it.processRecords(recordsToIngest)
}
}
while (true) {
processRecords()
}
}
fun processRecords() {
for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
if (record.value().contains("ERROR")) {
return Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
val ingester = Ingester(
settings.sftpSettings,
createFedoraClient(settings.appSettings),
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
)
return try {
ingester.ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (ex: Exception) {
log.error("Ingestion of record ${record.key()} failed: ${ex.localizedMessage}.")
log.error(ex.stackTrace)
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
override fun close() {
consumer.close()
producer.close()
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment