Commit 58b12615 authored by Thomas Bernhart's avatar Thomas Bernhart
Browse files

WIP: Refactor Service and Ingester to simpify testing

parent 0053c18b
Pipeline #11555 failed with stages
in 1 minute and 8 seconds
......@@ -19,18 +19,30 @@ package org.memobase
// import java.io.File
// import java.io.InputStream
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import java.net.URI
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.sftp.SftpClient
class Ingester(private val sftpClient: SftpClient, private val fedoraClient: FedoraClient) {
class Ingester(private val producer: Producer, private val sftpClient: SftpClient, private val fedoraClient: FedoraClient) {
private val log = LogManager.getLogger("IngestService")
fun processRecords(recordsToIngest: ConsumerRecords<String, String>): MutableList<Report> {
val reports = mutableListOf<Report>()
for (record in recordsToIngest) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
reports.add(ingestReport)
}
return reports
}
@Throws(FcrepoOperationFailedException::class)
fun ingest(id: String, content: String) {
private fun ingest(id: String, content: String) {
log.info("Ingest record $id.")
log.info("Content: $content")
// fedoraClient.createOrUpdateRdfResource()
......@@ -42,6 +54,23 @@ class Ingester(private val sftpClient: SftpClient, private val fedoraClient: Fed
)
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
return try {
ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
// private fun parseRdf(rdf: String) {
// "<https://www.ica.org/standards/RiC/ontology#identifier>"
// }
......
......@@ -31,92 +31,68 @@ import org.memobase.sftp.SftpClient
class Service(fileName: String = "app.yml") {
private val log: Logger = LogManager.getLogger("FedoraIngestService")
companion object {
const val JOB_REPORT_ID = "fedora-ingest"
const val FEDORA_PROPERTIES_PREFIX = "fedora"
fun createFedoraClient(appSettings: Properties): FedoraClient {
return FedoraClientImpl.builder()
// FIXME: update to new version supporting method properties()
.properties(appSettings, FEDORA_PROPERTIES_PREFIX)
.build()
}
}
val settings =
SettingsLoader(
listOf(
"fedora.internalBaseUrl",
"fedora.externalBaseUrl",
"fedora.username",
"fedora.password"),
val settings = SettingsLoader(
listOf(
"$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.username",
"$FEDORA_PROPERTIES_PREFIX.password"),
fileName,
useProducerConfig = true,
useConsumerConfig = true,
readSftpSettings = true
)
fileName,
useProducerConfig = true,
useConsumerConfig = true,
readSftpSettings = true
)
private val log: Logger = LogManager.getLogger("FedoraIngestService")
private val consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val ingester = Ingester(
producer,
SftpClient(settings.sftpSettings),
createFedoraClient(settings.appSettings))
companion object {
const val JOB_REPORT_ID = "fedora-ingest"
fun createFedoraClient(appSettings: Properties): FedoraClient {
val settingsPrefix = "fedora."
return FedoraClientImpl.builder()
.credentials(appSettings.getProperty(settingsPrefix + "username"),
appSettings.getProperty(settingsPrefix + "password"))
.urls(appSettings.getProperty(settingsPrefix + "internalBaseUrl"),
appSettings.getProperty(settingsPrefix + "externalBaseUrl"))
.build()
}
}
fun run() {
consumer.use { consumer ->
producer.use { producer ->
log.info("Connected to Kafka.")
val reports = mutableListOf<Report>()
while (true) {
val recordsToIngest = consumer.fetchRecords()
for (record in recordsToIngest) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
reports.add(ingestReport)
}
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
val reports = ingester.processRecords(recordsToIngest)
val failures = reports.count { report -> report.status == ReportStatus.failure }
if (failures > 0) {
log.warn("Ingest ended with $failures failures!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.failure,
message = ReportMessages.processFailure(failures, reports.size)),
settings.processReportTopic
)
} else {
log.info("Ingest was successful!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.success,
message = ReportMessages.processSuccess(reports.size)),
settings.processReportTopic
)
}
// FIXME: How does service know when a ingest is finished?
val failures = reports.count { report -> report.status == ReportStatus.failure }
if (failures > 0) {
log.warn("Ingest ended with $failures failures!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.failure,
message = ReportMessages.processFailure(failures, reports.size)),
settings.processReportTopic
)
} else {
log.info("Ingest was successful!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.success,
message = ReportMessages.processSuccess(reports.size)),
settings.processReportTopic
)
}
}
}
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
return try {
ingester.ingest(record.key(), record.value())
Report(
id = record.key(),
status = ReportStatus.success,
message = ReportMessages.ingestedRecord(record.key())
)
} catch (e: FcrepoOperationFailedException) {
Report(
id = record.key(),
status = ReportStatus.failure,
message = ReportMessages.ingestFailed(record.key())
)
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment