Commit 08569f9e authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

Improve reporting

parent f99da3db
Pipeline #16454 passed with stages
in 5 minutes and 44 seconds
...@@ -24,6 +24,9 @@ import java.util.Properties ...@@ -24,6 +24,9 @@ import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.exceptions.MissingMimeTypeException
import org.memobase.exceptions.SftpClientException
import org.memobase.fedora.FedoraClient import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
...@@ -80,32 +83,57 @@ class Service(fileName: String = "app.yml") : Closeable { ...@@ -80,32 +83,57 @@ class Service(fileName: String = "app.yml") : Closeable {
fun processRecords() { fun processRecords() {
for (record in consumer.fetchRecords()) { for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record) processRecord(record)
producer.sendReport(record.headers(), ingestReport)
} }
} }
private fun processRecord(record: ConsumerRecord<String, String>): Report { private fun processRecord(record: ConsumerRecord<String, String>) {
val ingester = Ingester( val ingester = Ingester(
settings.sftpSettings, settings.sftpSettings,
createFedoraClient(settings.appSettings), createFedoraClient(settings.appSettings),
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl") settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
) )
return try { try {
ingester.ingest(record.key(), record.value()) ingester.ingest(record.key(), record.value())
Report( val report = Report(
id = record.key(), id = record.key(),
status = ReportStatus.success, status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key()) message = ReportMessages.ingestedRecord(record.key())
) )
producer.sendReport(record.headers(), report)
} catch (ex: FcrepoOperationFailedException) {
log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(),
status = ReportStatus.failure,
message = "Fedora Exception: ${ex.localizedMessage}"
)
producer.sendReport(record.headers(), report)
} catch (ex: MissingMimeTypeException) {
log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(),
status = ReportStatus.failure,
message = "Missing MimeType Exception: ${ex.localizedMessage}"
)
producer.sendReport(record.headers(), report)
} catch (ex: SftpClientException) {
log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
val report = Report(
id = record.key(),
status = ReportStatus.failure,
message = "SFTP Exception: ${ex.localizedMessage}"
)
producer.sendReport(record.headers(), report)
} catch (ex: Exception) { } catch (ex: Exception) {
log.error("Ingestion of record ${record.key()} failed: ${ex.localizedMessage}", ex) log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Report( val report = Report(
id = record.key(), id = record.key(),
status = ReportStatus.failure, status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key()) message = "Unknown Exception: ${ex.localizedMessage}"
) )
producer.sendReport(record.headers(), report)
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment