Service.kt 6.48 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * fedora-ingest-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
import java.io.Closeable
import java.util.Properties
23
import org.apache.kafka.clients.consumer.ConsumerRecord
Jonas Waeber's avatar
Jonas Waeber committed
24
25
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
26
import org.fcrepo.client.FcrepoOperationFailedException
Thomas Bernhart's avatar
Thomas Bernhart committed
27
28
import org.memobase.exceptions.MissingMimeTypeException
import org.memobase.exceptions.SftpClientException
29
import org.memobase.fedora.FedoraClient
30
import org.memobase.fedora.FedoraClientImpl
Jonas Waeber's avatar
Jonas Waeber committed
31
import org.memobase.settings.SettingsLoader
32
import org.memobase.sftp.SftpClient
Jonas Waeber's avatar
Jonas Waeber committed
33

34
class Service(fileName: String = "app.yml") : Closeable {
35

36
    companion object {
37
        const val DIGITAL_OBJECT_TYPE = "digitalObject"
38
        const val FEDORA_PROPERTIES_PREFIX = "fedora"
39
        const val CONSUMER_MAX_POLL_RECORDS = "10"
40
        const val CONSUMER_MAX_INTERVAL_MS = "3600000" // 3600000ms = 1h
41

42
43
44
        const val BINARY_FILE_URI_PATH = "binary"
        const val SFTP_PREFIX = "sftp:"

45
46
        fun createFedoraClient(appSettings: Properties): FedoraClient {
            return FedoraClientImpl.builder()
47
48
                .properties(appSettings, FEDORA_PROPERTIES_PREFIX)
                .build()
49
50
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
51

52
    private val settings = SettingsLoader(
53
54
55
56
57
58
59
60
61
62
63
        listOf(
            "isSimple",
            "$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.username",
            "$FEDORA_PROPERTIES_PREFIX.password"
        ),
        fileName,
        useProducerConfig = true,
        useConsumerConfig = true,
        readSftpSettings = true
64
    )
65

66
    private val log: Logger = LogManager.getLogger("FedoraIngestService")
67
68
69
    private val fedoraClient = createFedoraClient(settings.appSettings)
    private val simpleIngester = SimpleIngester(fedoraClient)
    private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean()
70
71
    private var consumer: Consumer
    private var producer: Producer
72
    private var sftpClient: SftpClient? = null
Jonas Waeber's avatar
Jonas Waeber committed
73

74
    init {
75
76
        val consumerSettings = settings.kafkaConsumerSettings
        consumerSettings.setProperty("max.poll.records", CONSUMER_MAX_POLL_RECORDS)
77
        consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
78
        consumer = Consumer(consumerSettings, settings.inputTopic)
79
        producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
80
81
82
83
84
        log.info("Connected to Kafka cluster.")
        if (!isSimple) {
            sftpClient = SftpClient(settings.sftpSettings)
            log.info("Connected to sFTP server.")
        }
85
    }
86

Jonas Waeber's avatar
Jonas Waeber committed
87
    fun run() {
88
89
90
91
92
        while (true) {
            processRecords()
        }
    }

93
    fun processRecords() {
94
        for (record in consumer.fetchRecords()) {
95
96
97
98
            val ingestReport = if (isSimple)
                processSingleEntity(record)
            else
                processRecord(record)
99
            producer.sendReport(record.headers(), ingestReport)
100
101
102
        }
    }

103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    /**
     * If there is only a single named entity within the content data and no binaries are loaded.
     */
    private fun processSingleEntity(record: ConsumerRecord<String, String>): Report {
        val key = record.key()
        val value = record.value()

        if (key == null)
            return Report("NoKey", ReportStatus.failure, "The key in message is null.")

        if (value == null)
            return Report(key, ReportStatus.failure, "The value in message is null.")

        return try {
            simpleIngester.ingest(key, value)
            Report(
                key, ReportStatus.success, "Successfully ingested message into Fedora repository."
            )
        } catch (ex: FcrepoOperationFailedException) {
            log.error(ex.localizedMessage)
            Report(
                key, ReportStatus.failure, ex.localizedMessage
            )
        }
    }

129
    private fun processRecord(record: ConsumerRecord<String, String>): Report {
130
131
132
133
134
135
136
        val ingester = sftpClient?.let {
            Ingester(
                it,
                fedoraClient,
                settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
            )
        }
137
138

        return try {
139
            ingester?.ingest(record.key(), record.value())
140
            Report(
141
142
143
                id = record.key(),
                status = ReportStatus.success,
                message = ReportMessages.ingestedRecord(record.key())
144
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
145
146
        } catch (ex: FcrepoOperationFailedException) {
            log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
147
148
149
150
            Report(
                id = record.key(),
                status = ReportStatus.failure,
                message = "Fedora Exception: ${ex.localizedMessage}"
151
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
152
153
        } catch (ex: MissingMimeTypeException) {
            log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
154
155
156
157
158
            Report(
                id = record.key(),
                status = ReportStatus.failure,
                message = "Missing MimeType Exception: ${ex.localizedMessage}"
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
159
160
        } catch (ex: SftpClientException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
161
162
163
164
165
            Report(
                id = record.key(),
                status = ReportStatus.failure,
                message = "SFTP Exception: ${ex.localizedMessage}"
            )
166
        } catch (ex: Exception) {
Thomas Bernhart's avatar
Thomas Bernhart committed
167
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
168
            Report(
169
170
                id = record.key(),
                status = ReportStatus.failure,
Jonas Waeber's avatar
Jonas Waeber committed
171
                message = "Unknown Exception: ${ex.localizedMessage}"
172
            )
173
174
        }
    }
175
176
177
178

    override fun close() {
        consumer.close()
        producer.close()
179
        sftpClient?.close()
180
    }
Jonas Waeber's avatar
Jonas Waeber committed
181
}