Service.kt 8.28 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * fedora-ingest-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
import ch.memobase.exceptions.SftpClientException
22
23
import ch.memobase.fedora.FedoraClient
import ch.memobase.fedora.FedoraClientImpl
24
25
26
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
27
import java.io.Closeable
28
29
import java.io.IOException
import java.net.URISyntaxException
30
import java.util.Properties
31
import org.apache.kafka.clients.consumer.ConsumerRecord
Jonas Waeber's avatar
Jonas Waeber committed
32
33
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
34
import org.fcrepo.client.FcrepoOperationFailedException
Thomas Bernhart's avatar
Thomas Bernhart committed
35
import org.memobase.exceptions.MissingMimeTypeException
Jonas Waeber's avatar
Jonas Waeber committed
36

37
class Service(fileName: String = "app.yml") : Closeable {
38

39
40
    companion object {
        const val FEDORA_PROPERTIES_PREFIX = "fedora"
41
        const val CONSUMER_MAX_POLL_RECORDS = "10"
42
        const val CONSUMER_MAX_INTERVAL_MS = "600000" // 600000ms = 10min
43
44
45
        const val BINARY_FILE_URI_PATH = "binary"
        const val SFTP_PREFIX = "sftp:"

46
47
        fun createFedoraClient(appSettings: Properties): FedoraClient {
            return FedoraClientImpl.builder()
48
49
                .properties(appSettings, FEDORA_PROPERTIES_PREFIX)
                .build()
50
51
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
52

53
    private val settings = SettingsLoader(
54
55
56
57
58
59
60
61
62
63
64
        listOf(
            "isSimple",
            "$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.username",
            "$FEDORA_PROPERTIES_PREFIX.password"
        ),
        fileName,
        useProducerConfig = true,
        useConsumerConfig = true,
        readSftpSettings = true
65
    )
66

67
    private val log: Logger = LogManager.getLogger("FedoraIngestService")
68
69
70
    private val fedoraClient = createFedoraClient(settings.appSettings)
    private val simpleIngester = SimpleIngester(fedoraClient)
    private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean()
71
72
    private var consumer: Consumer
    private var producer: Producer
73
    private var sftpClient: SftpClient? = null
Jonas Waeber's avatar
Jonas Waeber committed
74

75
    init {
76
77
        val consumerSettings = settings.kafkaConsumerSettings
        consumerSettings.setProperty("max.poll.records", CONSUMER_MAX_POLL_RECORDS)
78
        consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
79
        consumer = Consumer(consumerSettings, settings.inputTopic)
80
        producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
81
82
83
84
85
        log.info("Connected to Kafka cluster.")
        if (!isSimple) {
            sftpClient = SftpClient(settings.sftpSettings)
            log.info("Connected to sFTP server.")
        }
86
    }
87

Jonas Waeber's avatar
Jonas Waeber committed
88
    fun run() {
89
90
91
92
93
        while (true) {
            processRecords()
        }
    }

94
    fun processRecords() {
95
        for (record in consumer.fetchRecords()) {
96
97
98
99
            val ingestReport = if (isSimple)
                processSingleEntity(record)
            else
                processRecord(record)
100
            producer.sendReport(record.headers(), ingestReport)
101
102
103
        }
    }

104
105
106
107
108
109
110
111
    /**
     * If there is only a single named entity within the content data and no binaries are loaded.
     */
    private fun processSingleEntity(record: ConsumerRecord<String, String>): Report {
        val key = record.key()
        val value = record.value()

        if (key == null)
112
            return Report("NoKey", ReportStatus.fatal, "The key in message is null.")
113
114

        if (value == null)
115
            return Report(key, ReportStatus.fatal, "The value in message is null.")
116
117
118
119
120
121
122
123
124

        return try {
            simpleIngester.ingest(key, value)
            Report(
                key, ReportStatus.success, "Successfully ingested message into Fedora repository."
            )
        } catch (ex: FcrepoOperationFailedException) {
            log.error(ex.localizedMessage)
            Report(
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
                id = record.key(),
                status = ReportStatus.fatal,
                message = "Unable to write to Fedora: ${ex.localizedMessage}"
            )
        } catch (ex: IOException) {
        log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = record.key(),
                status = ReportStatus.fatal,
                message = "IO error: ${ex.localizedMessage}"
            )
        } catch (ex: URISyntaxException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = record.key(),
                status = ReportStatus.fatal,
                message = "Malformed URI: ${ex.localizedMessage}"
            )
        } catch (ex: Exception) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = record.key(),
                status = ReportStatus.fatal,
                message = "Unknown error: ${ex.localizedMessage}"
149
150
151
152
            )
        }
    }

153
    private fun processRecord(record: ConsumerRecord<String, String>): Report {
154
155
156
157
158
159
160
161
        val key = record.key()
        val value = record.value()

        if (key == null)
            return Report("NoKey", ReportStatus.fatal, "The key in message is null.")

        if (value == null)
            return Report(key, ReportStatus.fatal, "The value in message is null.")
162
163

        return try {
164
165
166
167
168
169
170
171
            val ingester = sftpClient?.let {
                Ingester(
                    it,
                    fedoraClient,
                    settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
                )
            }
            ingester?.ingest(key, value)
172
            Report(
173
                id = key,
174
                status = ReportStatus.success,
175
                message = ReportMessages.ingestedRecord(key)
176
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
177
        } catch (ex: FcrepoOperationFailedException) {
178
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
179
            Report(
180
                id = key,
181
                status = ReportStatus.fatal,
182
183
184
185
186
187
188
189
                message = "Unable to write to Fedora: ${ex.localizedMessage}"
            )
        } catch (ex: IOException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = key,
                status = ReportStatus.fatal,
                message = "IO error: ${ex.localizedMessage}"
190
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
191
        } catch (ex: MissingMimeTypeException) {
192
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
193
            Report(
194
                id = key,
195
                status = ReportStatus.fatal,
196
                message = "Missing MimeType: ${ex.localizedMessage}"
Jonas Waeber's avatar
Jonas Waeber committed
197
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
198
199
        } catch (ex: SftpClientException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
200
            Report(
201
                id = key,
202
                status = ReportStatus.fatal,
203
204
205
206
207
208
209
210
                message = "Unable to read from SFTP: ${ex.localizedMessage}"
            )
        } catch (ex: URISyntaxException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = key,
                status = ReportStatus.fatal,
                message = "Malformed URI: ${ex.localizedMessage}"
Jonas Waeber's avatar
Jonas Waeber committed
211
            )
212
        } catch (ex: Exception) {
Thomas Bernhart's avatar
Thomas Bernhart committed
213
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
214
            Report(
215
                id = key,
216
                status = ReportStatus.fatal,
217
                message = "Unknown error: ${ex.localizedMessage}"
218
            )
219
220
        }
    }
221
222
223
224

    override fun close() {
        consumer.close()
        producer.close()
225
        sftpClient?.close()
226
    }
Jonas Waeber's avatar
Jonas Waeber committed
227
}