Commit 3135a5cf authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Implement simple ingester for institutions & record sets

parent 10f40f28
......@@ -4,6 +4,7 @@ clientId: fedora-drupal-sync-ingest-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
fedoraConfigs: fedora-admin-credentials
isSimple: true
inputTopicName: drupal-sync-output
outputTopicName: void_topic
......
......@@ -4,6 +4,7 @@ clientId: fedora-ingest-service-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
fedoraConfigs: fedora-admin-credentials
isSimple: false
inputTopicName: import-process-ingest
outputTopicName: void_topic
......
......@@ -5,6 +5,7 @@ metadata:
namespace: memobase
data:
CLIENT_ID: {{ .Values.clientId }}
IS_SIMPLE: {{ .Values.isSimple }}
TOPIC_IN: {{ .Values.inputTopicName }}
TOPIC_OUT: {{ .Values.outputTopicName }}
TOPIC_REPORTING: {{ .Values.reportingTopicName }}
......@@ -9,6 +9,7 @@ clientId: fedora-ingest-service-app
kafkaConfigs: prod-kafka-bootstrap-servers
sftpConfigs: internal-sftp-config
fedoraConfigs: fedora-admin-credentials
isSimple: true
inputTopicName: import-process-normalization
outputTopicName: void_topic
......
......@@ -26,7 +26,9 @@ class App {
@JvmStatic fun main(args: Array<String>) {
try {
val service = Service()
service.run()
service.use {
it.run()
}
} catch (ex: Exception) {
log.error("Stopping application due to error: ${ex.localizedMessage}", ex)
exitProcess(1)
......
......@@ -18,15 +18,15 @@
package org.memobase
import java.io.Closeable
import java.lang.Exception
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.FedoraClientImpl
import org.memobase.settings.SettingsLoader
import java.io.Closeable
import java.util.*
class Service(fileName: String = "app.yml") : Closeable {
......@@ -48,6 +48,7 @@ class Service(fileName: String = "app.yml") : Closeable {
private val settings = SettingsLoader(
listOf(
"isSimple",
"$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
"$FEDORA_PROPERTIES_PREFIX.username",
......@@ -60,6 +61,9 @@ class Service(fileName: String = "app.yml") : Closeable {
)
private val log: Logger = LogManager.getLogger("FedoraIngestService")
private val fedoraClient = createFedoraClient(settings.appSettings)
private val simpleIngester = SimpleIngester(fedoraClient)
private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean()
private var consumer: Consumer
private var producer: Producer
......@@ -78,17 +82,46 @@ class Service(fileName: String = "app.yml") : Closeable {
}
}
fun processRecords() {
private fun processRecords() {
for (record in consumer.fetchRecords()) {
val ingestReport = processRecord(record)
val ingestReport = if (isSimple)
processSingleEntity(record)
else
processRecord(record)
producer.sendReport(record.headers(), ingestReport)
}
}
/**
* If there is only a single named entity within the content data and no binaries are loaded.
*/
private fun processSingleEntity(record: ConsumerRecord<String, String>): Report {
val key = record.key()
val value = record.value()
if (key == null)
return Report("NoKey", ReportStatus.failure, "The key in message is null.")
if (value == null)
return Report(key, ReportStatus.failure, "The value in message is null.")
return try {
simpleIngester.ingest(key, value)
Report(
key, ReportStatus.success, "Successfully ingested message into Fedora repository."
)
} catch (ex: FcrepoOperationFailedException) {
log.error(ex.localizedMessage)
Report(
key, ReportStatus.failure, ex.localizedMessage
)
}
}
private fun processRecord(record: ConsumerRecord<String, String>): Report {
val ingester = Ingester(
settings.sftpSettings,
createFedoraClient(settings.appSettings),
fedoraClient,
settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
)
......
/*
* fedora-ingest-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.logging.log4j.LogManager
import org.fcrepo.client.FcrepoOperationFailedException
import org.memobase.fedora.FedoraClient
import org.memobase.fedora.RdfContentTypes
import java.net.URI
class SimpleIngester(
private val fedoraClient: FedoraClient
) {
private val log = LogManager.getLogger("SimpleIngester")
@Throws(FcrepoOperationFailedException::class)
fun ingest(id: String, content: String) {
fedoraClient.startTransaction().use { transaction ->
log.info("Begin transaction to ingest single entity with uri $id.")
transaction.createOrUpdateRdfResource(URI(id), content, RdfContentTypes.NTRIPLES)
transaction.commit()
log.info("Committed transaction for entity with uri $id.")
}
}
}
......@@ -4,6 +4,7 @@ sftp:
user: ${SFTP_USER:?env}
password: ${SFTP_PASSWORD:?env}
app:
isSimple: ${IS_SIMPLE:?env}
fedora:
internalBaseUrl: ${FEDORA_INTERNAL_BASE_URL:?env}
externalBaseUrl: ${FEDORA_EXTERNAL_BASE_URL:?env}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment