Service.kt 8.28 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * fedora-ingest-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
import ch.memobase.exceptions.SftpClientException
22
23
import ch.memobase.fedora.FedoraClient
import ch.memobase.fedora.FedoraClientImpl
24
25
26
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
27
import java.io.Closeable
28
29
import java.io.IOException
import java.net.URISyntaxException
30
import java.util.Properties
31
import org.apache.kafka.clients.consumer.ConsumerRecord
Jonas Waeber's avatar
Jonas Waeber committed
32
33
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
34
import org.fcrepo.client.FcrepoOperationFailedException
Thomas Bernhart's avatar
Thomas Bernhart committed
35
import org.memobase.exceptions.MissingMimeTypeException
Jonas Waeber's avatar
Jonas Waeber committed
36

37
class Service(fileName: String = "app.yml") : Closeable {
38

39
40
    companion object {
        const val FEDORA_PROPERTIES_PREFIX = "fedora"
41
        const val CONSUMER_MAX_POLL_RECORDS = "10"
42
        const val CONSUMER_MAX_INTERVAL_MS = "3600000" // 3600000ms = 1h
43

44
45
46
        const val BINARY_FILE_URI_PATH = "binary"
        const val SFTP_PREFIX = "sftp:"

47
48
        fun createFedoraClient(appSettings: Properties): FedoraClient {
            return FedoraClientImpl.builder()
49
50
                .properties(appSettings, FEDORA_PROPERTIES_PREFIX)
                .build()
51
52
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
53

54
    private val settings = SettingsLoader(
55
56
57
58
59
60
61
62
63
64
65
        listOf(
            "isSimple",
            "$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.username",
            "$FEDORA_PROPERTIES_PREFIX.password"
        ),
        fileName,
        useProducerConfig = true,
        useConsumerConfig = true,
        readSftpSettings = true
66
    )
67

68
    private val log: Logger = LogManager.getLogger("FedoraIngestService")
69
70
71
    private val fedoraClient = createFedoraClient(settings.appSettings)
    private val simpleIngester = SimpleIngester(fedoraClient)
    private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean()
72
73
    private var consumer: Consumer
    private var producer: Producer
74
    private var sftpClient: SftpClient? = null
Jonas Waeber's avatar
Jonas Waeber committed
75

76
    init {
77
78
        val consumerSettings = settings.kafkaConsumerSettings
        consumerSettings.setProperty("max.poll.records", CONSUMER_MAX_POLL_RECORDS)
79
        consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
80
        consumer = Consumer(consumerSettings, settings.inputTopic)
81
        producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
82
83
84
85
86
        log.info("Connected to Kafka cluster.")
        if (!isSimple) {
            sftpClient = SftpClient(settings.sftpSettings)
            log.info("Connected to sFTP server.")
        }
87
    }
88

Jonas Waeber's avatar
Jonas Waeber committed
89
    fun run() {
90
91
92
93
94
        while (true) {
            processRecords()
        }
    }

95
    fun processRecords() {
96
        for (record in consumer.fetchRecords()) {
97
98
99
100
            val ingestReport = if (isSimple)
                processSingleEntity(record)
            else
                processRecord(record)
101
            producer.sendReport(record.headers(), ingestReport)
102
103
104
        }
    }

105
106
107
108
109
110
111
112
    /**
     * If there is only a single named entity within the content data and no binaries are loaded.
     */
    private fun processSingleEntity(record: ConsumerRecord<String, String>): Report {
        val key = record.key()
        val value = record.value()

        if (key == null)
113
            return Report("NoKey", ReportStatus.fatal, "The key in message is null.")
114
115

        if (value == null)
116
            return Report(key, ReportStatus.fatal, "The value in message is null.")
117
118
119
120
121
122
123
124
125

        return try {
            simpleIngester.ingest(key, value)
            Report(
                key, ReportStatus.success, "Successfully ingested message into Fedora repository."
            )
        } catch (ex: FcrepoOperationFailedException) {
            log.error(ex.localizedMessage)
            Report(
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
                id = record.key(),
                status = ReportStatus.fatal,
                message = "Unable to write to Fedora: ${ex.localizedMessage}"
            )
        } catch (ex: IOException) {
        log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = record.key(),
                status = ReportStatus.fatal,
                message = "IO error: ${ex.localizedMessage}"
            )
        } catch (ex: URISyntaxException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = record.key(),
                status = ReportStatus.fatal,
                message = "Malformed URI: ${ex.localizedMessage}"
            )
        } catch (ex: Exception) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = record.key(),
                status = ReportStatus.fatal,
                message = "Unknown error: ${ex.localizedMessage}"
150
151
152
153
            )
        }
    }

154
    private fun processRecord(record: ConsumerRecord<String, String>): Report {
155
156
157
158
159
160
161
162
        val key = record.key()
        val value = record.value()

        if (key == null)
            return Report("NoKey", ReportStatus.fatal, "The key in message is null.")

        if (value == null)
            return Report(key, ReportStatus.fatal, "The value in message is null.")
163
164

        return try {
165
166
167
168
169
170
171
172
            val ingester = sftpClient?.let {
                Ingester(
                    it,
                    fedoraClient,
                    settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
                )
            }
            ingester?.ingest(key, value)
173
            Report(
174
                id = key,
175
                status = ReportStatus.success,
176
                message = ReportMessages.ingestedRecord(key)
177
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
178
        } catch (ex: FcrepoOperationFailedException) {
179
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
180
            Report(
181
                id = key,
182
                status = ReportStatus.fatal,
183
184
185
186
187
188
189
190
                message = "Unable to write to Fedora: ${ex.localizedMessage}"
            )
        } catch (ex: IOException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = key,
                status = ReportStatus.fatal,
                message = "IO error: ${ex.localizedMessage}"
191
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
192
        } catch (ex: MissingMimeTypeException) {
193
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
194
            Report(
195
                id = key,
196
                status = ReportStatus.fatal,
197
                message = "Missing MimeType: ${ex.localizedMessage}"
Jonas Waeber's avatar
Jonas Waeber committed
198
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
199
200
        } catch (ex: SftpClientException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
201
            Report(
202
                id = key,
203
                status = ReportStatus.fatal,
204
205
206
207
208
209
210
211
                message = "Unable to read from SFTP: ${ex.localizedMessage}"
            )
        } catch (ex: URISyntaxException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
            Report(
                id = key,
                status = ReportStatus.fatal,
                message = "Malformed URI: ${ex.localizedMessage}"
Jonas Waeber's avatar
Jonas Waeber committed
212
            )
213
        } catch (ex: Exception) {
Thomas Bernhart's avatar
Thomas Bernhart committed
214
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
215
            Report(
216
                id = key,
217
                status = ReportStatus.fatal,
218
                message = "Unknown error: ${ex.localizedMessage}"
219
            )
220
221
        }
    }
222
223
224
225

    override fun close() {
        consumer.close()
        producer.close()
226
        sftpClient?.close()
227
    }
Jonas Waeber's avatar
Jonas Waeber committed
228
}