Commit c073747c authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement termination & update client

parent 5d0b6d56
...@@ -46,8 +46,8 @@ dependencies { ...@@ -46,8 +46,8 @@ dependencies {
implementation 'org.apache.jena:apache-jena:3.14.0' implementation 'org.apache.jena:apache-jena:3.14.0'
implementation 'org.memobase:memobase-service-utilities:1.10.0' implementation 'org.memobase:memobase-service-utilities:1.12.1'
implementation 'org.memobase:fedora-client:0.4.1' implementation 'org.memobase:fedora-client:0.5.0'
// KOTLIN IMPORTS // KOTLIN IMPORTS
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
......
...@@ -27,7 +27,7 @@ class Consumer(props: Properties, topic: String) : Closeable { ...@@ -27,7 +27,7 @@ class Consumer(props: Properties, topic: String) : Closeable {
private val instance = KafkaConsumer<String, String>(props) private val instance = KafkaConsumer<String, String>(props)
init { init {
instance.subscribe(listOf(topic)) instance.subscribe(listOf(topic, "import-process-admin"))
} }
fun fetchRecords(): ConsumerRecords<String, String> { fun fetchRecords(): ConsumerRecords<String, String> {
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
*/ */
package org.memobase package org.memobase
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
...@@ -28,6 +30,7 @@ import java.io.Closeable ...@@ -28,6 +30,7 @@ import java.io.Closeable
import java.io.File import java.io.File
import java.io.StringWriter import java.io.StringWriter
import java.net.URI import java.net.URI
import kotlin.system.exitProcess
class Ingester( class Ingester(
private val producer: Producer, private val producer: Producer,
...@@ -36,11 +39,28 @@ class Ingester( ...@@ -36,11 +39,28 @@ class Ingester(
) : Closeable { ) : Closeable {
private val log = LogManager.getLogger("FedoraIngester") private val log = LogManager.getLogger("FedoraIngester")
private val klaxon = Klaxon()
fun processRecords(recordsToIngest: ConsumerRecords<String, String>) { fun processRecords(recordsToIngest: ConsumerRecords<String, String>) {
var terminateApplication = false
for (record in recordsToIngest) { for (record in recordsToIngest) {
val ingestReport = processRecord(record) try {
producer.sendReport(ingestReport) klaxon.parse<Message>(record.value()).let {
if (it != null) {
if (it.jobName == "fedora-ingest-service" && it.action == "termination") {
terminateApplication = true
}
}
}
} catch (ex: KlaxonException) {
val ingestReport = processRecord(record)
producer.sendReport(ingestReport)
}
}
if (terminateApplication) {
log.info("All records have been processed. Terminating service.")
exitProcess(0)
} }
} }
......
package org.memobase
data class Message(
val processId: String,
val jobName: String,
val action: String
)
...@@ -18,20 +18,7 @@ ...@@ -18,20 +18,7 @@
package org.memobase package org.memobase
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
object ReportMessages { object ReportMessages {
fun processFailure(failures: Int, total: Int): String {
return "Failed to ingest $failures of $total resources."
}
fun processSuccess(total: Int): String {
return "Successfully ingested $total resources."
}
fun ingestedRecord(id: String): String { fun ingestedRecord(id: String): String {
return "Ingested resource $id." return "Ingested resource $id."
} }
......
/*
* fedora-ingest-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
object ReportStatus {
const val success = "SUCCESS"
const val failure = "FAILURE"
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment