Commit b6de864d authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

Merge branch 'MEMO-733-improve-reporting' into 'master'

MEMO-733: improve reporting

See merge request !6
parents 8e247802 3d054e1b
Pipeline #15075 passed with stages
in 5 minutes
......@@ -106,25 +106,20 @@ class ServiceTest {
val expectedRecordCount = params.expectedIngestReports.size
val service = Service("test${params.count}.yml")
var inputMessages = service.consumer.fetchRecords()
while (inputMessages.isEmpty) {
inputMessages = service.consumer.fetchRecords()
}
service.ingester.processRecords(inputMessages)
val totalConsumerRecords = mutableListOf<ConsumerRecord<String, String>>()
var result = reportConsumer.poll(Duration.ofMillis(1000))
while (totalConsumerRecords.size != expectedRecordCount) {
service.processRecords()
var result = reportConsumer.poll(Duration.ofMillis(10))
if (result.count() > 0) {
totalConsumerRecords.addAll(result.asIterable())
}
log.error(result.count())
result = reportConsumer.poll(Duration.ofMillis(10))
}
assertThat(totalConsumerRecords)
.size().isEqualTo(expectedRecordCount)
assertThat(totalConsumerRecords[0].value()).isEqualTo(params.expectedIngestReports[0].toJson())
}
private fun kafkaTests() = Stream.of(
......@@ -135,9 +130,9 @@ class ServiceTest {
),
listOf(
Report(
id = "AFZ-IB_Becker_Audiovisuals_63",
id = "https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63",
status = "SUCCESS",
message = "Ingested resource AFZ-IB_Becker_Audiovisuals_63."
message = "Ingested resource https://memobase.ch/record/AFZ-IB_Becker_Audiovisuals_63."
)
)
)
......
......@@ -28,8 +28,7 @@ class App {
val service = Service()
service.run()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
log.error("Stopping application due to error: ${ex.localizedMessage}", ex)
exitProcess(1)
}
}
......
......@@ -17,38 +17,28 @@
*/
package org.memobase
import java.io.Closeable
import java.io.File
import java.io.StringWriter
import java.net.URI
import org.apache.jena.rdf.model.Model
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraTransactionClient
import org.memobase.fedora.RdfContentTypes
import org.memobase.settings.SftpSettings
import org.memobase.sftp.SftpClient
class Ingester(
private val producer: Producer,
private val sftpClient: SftpClient,
private val sftpSettings: SftpSettings,
private val fedoraClient: FedoraClient,
private val externalBaseUrl: String
) : Closeable {
) {
private val log = LogManager.getLogger("FedoraIngester")
fun processRecords(recordsToIngest: ConsumerRecords<String, String>) {
for (record in recordsToIngest) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
}
}
@Throws(FcrepoOperationFailedException::class)
private fun ingest(id: String, content: String) {
fun ingest(id: String, content: String) {
log.info("Ingest record $id.")
log.info("Content: $content")
val rdfHandler = RdfHandler(content, externalBaseUrl)
......@@ -73,79 +63,44 @@ class Ingester(
// ingest record, instantiations and binaries:
transaction.createOrUpdateRdfResource(URI(recordPair.first), data, RdfContentTypes.NTRIPLES)
ingestInstantiations(rdfHandler.getInstantiations(), transaction)
ingestBinaries(rdfHandler.getSftpLocators(), rdfHandler, transaction)
val sftpLocators = rdfHandler.getSftpLocators()
if (sftpLocators.size > 0) {
ingestBinaries(sftpLocators, rdfHandler, transaction)
}
transaction.commit()
log.info("Ingested record ${recordPair.first}.")
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
if (record.value().contains("ERROR")) {
return Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
return try {
ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
log.error("Ingestion of record ${record.key()} failed: ${e.localizedMessage}.")
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
private fun ingestInstantiations(instantiations: List<Pair<String, Model>>, transaction: FedoraTransactionClient) {
instantiations.forEach { instantiationPair ->
val instantiationOutput = StringWriter()
instantiationPair.second.write(instantiationOutput, "NTRIPLES")
try {
val instantiationData = instantiationOutput.toString()
log.info("Ingesting instantiation ${instantiationPair.first}.")
transaction.createOrUpdateRdfResource(URI(instantiationPair.first), instantiationData, RdfContentTypes.NTRIPLES)
log.info("Ingested instantiation ${instantiationPair.first}.")
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of instantiation ${instantiationPair.first} failed: ${ex.localizedMessage}.")
throw ex
}
val instantiationData = instantiationOutput.toString()
log.info("Ingesting instantiation ${instantiationPair.first}.")
transaction.createOrUpdateRdfResource(URI(instantiationPair.first), instantiationData, RdfContentTypes.NTRIPLES)
log.info("Ingested instantiation ${instantiationPair.first}.")
}
}
private fun ingestBinaries(sftpLocators: List<Pair<String, String?>>, rdfHandler: RdfHandler, transaction: FedoraTransactionClient) {
sftpLocators.forEach {
val digitalInstantiationUrl = it.first
it.second.let { path ->
if (path != null) {
sftpClient.open(File(path)).use { stream ->
val binaryUri = "${it.first}/binary"
try {
SftpClient(sftpSettings).use { sftpClient ->
log.info("Connected to sFTP server.")
sftpLocators.forEach {
val digitalInstantiationUrl = it.first
it.second.let { path ->
if (path != null) {
sftpClient.open(File(path)).use { stream ->
val binaryUri = "${it.first}/binary"
val mimeType = rdfHandler.getMimeType().first { mT ->
mT.first == digitalInstantiationUrl
}.second
log.info("Ingesting binary $binaryUri with mime type $mimeType.")
transaction.createOrUpdateBinaryResource(URI(binaryUri), stream, mimeType)
} catch (ex: FcrepoOperationFailedException) {
log.error("Ingestion of binary $binaryUri failed: ${ex.localizedMessage}.")
throw ex
}
}
}
}
}
}
override fun close() {
sftpClient.close()
producer.close()
}
}
......@@ -18,16 +18,17 @@
package org.memobase
// import org.bouncycastle.cms.RecipientId.password
import java.io.Closeable
import java.lang.Exception
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader
import org.memobase.sftp.SftpClient
class Service(fileName: String = "app.yml") {
class Service(fileName: String = "app.yml") : Closeable {
companion object {
const val FEDORA_PROPERTIES_PREFIX = "fedora"
......@@ -53,23 +54,62 @@ class Service(fileName: String = "app.yml") {
)
private val log: Logger = LogManager.getLogger("FedoraIngestService")
private var consumer: Consumer
private var producer: Producer
val consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val sftp = SftpClient(settings.sftpSettings)
private val fedora = createFedoraClient(settings.appSettings)
val ingester = Ingester(producer, sftp, fedora,
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl"))
init {
consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
log.info("Connected to Kafka.")
}
fun run() {
consumer.use { consumer ->
ingester.use {
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
it.processRecords(recordsToIngest)
}
}
while (true) {
processRecords()
}
}
fun processRecords() {
for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
if (record.value().contains("ERROR")) {
return Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
val ingester = Ingester(
settings.sftpSettings,
createFedoraClient(settings.appSettings),
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
)
return try {
ingester.ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (ex: Exception) {
log.error("Ingestion of record ${record.key()} failed: ${ex.localizedMessage}", ex)
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
override fun close() {
consumer.close()
producer.close()
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment