Commit 7809a718 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor producer, report and service

parent 203c9512
......@@ -11,7 +11,7 @@ test:
tags:
- mbr
script:
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast --tests "org.memobase.ServiceTest"
- gradle --no-daemon --no-scan --no-build-cache test --fail-fast
.build-image:
stage: publish
......
......@@ -17,22 +17,17 @@
*/
package org.memobase
import com.beust.klaxon.Klaxon
import java.io.Closeable
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.util.Properties
class Producer(props: Properties, topic: String) : Closeable {
private val instance = KafkaProducer<String, String>(props)
private val reportingTopic = "$topic-reporting"
fun sendReport(report: Report) {
instance.send(ProducerRecord(reportingTopic, report.id, Klaxon().toJsonString(report)))
}
fun sendJobReport(report: Report, topic: String) {
instance.send(ProducerRecord(topic, report.id, Klaxon().toJsonString(report)))
instance.send(ProducerRecord(reportingTopic, report.id, report.toJson()))
}
override fun close() {
......
......@@ -18,8 +18,14 @@
package org.memobase
import com.beust.klaxon.Klaxon
data class Report(
val id: String,
val status: String,
val message: String
)
) {
fun toJson(): String {
return Klaxon().toJsonString(this)
}
}
......@@ -48,7 +48,8 @@ class Service(fileName: String = "app.yml") {
"$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.username",
"$FEDORA_PROPERTIES_PREFIX.password"),
"$FEDORA_PROPERTIES_PREFIX.password"
),
fileName,
useProducerConfig = true,
useConsumerConfig = true,
......@@ -58,40 +59,18 @@ class Service(fileName: String = "app.yml") {
private val log: Logger = LogManager.getLogger("FedoraIngestService")
private val consumer = Consumer(settings.kafkaConsumerSettings, settings.inputTopic)
private val producer = Producer(settings.kafkaProducerSettings, settings.outputTopic)
private val ingester = Ingester(
producer,
SftpClient(settings.sftpSettings),
createFedoraClient(settings.appSettings))
Producer(settings.kafkaProducerSettings, settings.outputTopic),
SftpClient(settings.sftpSettings),
createFedoraClient(settings.appSettings)
)
fun run() {
consumer.use { consumer ->
producer.use { producer ->
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
val reports = ingester.processRecords(recordsToIngest)
// FIXME: How does service know when a ingest is finished?
val failures = reports.count { report -> report.status == ReportStatus.failure }
if (failures > 0) {
log.warn("Ingest ended with $failures failures!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.failure,
message = ReportMessages.processFailure(failures, reports.size)),
settings.processReportTopic
)
} else {
log.info("Ingest was successful!")
producer.sendJobReport(
Report(JOB_REPORT_ID,
status = ReportStatus.success,
message = ReportMessages.processSuccess(reports.size)),
settings.processReportTopic
)
}
}
log.info("Connected to Kafka.")
while (true) {
val recordsToIngest = consumer.fetchRecords()
ingester.processRecords(recordsToIngest)
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment