Service.kt 6.45 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
/*
Jonas Waeber's avatar
Jonas Waeber committed
2
 * fedora-ingest-service
Jonas Waeber's avatar
Jonas Waeber committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

package org.memobase

21
22
23
24
import ch.memobase.exceptions.SftpClientException
import ch.memobase.reporting.ReportStatus
import ch.memobase.settings.SettingsLoader
import ch.memobase.sftp.SftpClient
25
import org.apache.kafka.clients.consumer.ConsumerRecord
Jonas Waeber's avatar
Jonas Waeber committed
26
27
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
28
import org.fcrepo.client.FcrepoOperationFailedException
Thomas Bernhart's avatar
Thomas Bernhart committed
29
import org.memobase.exceptions.MissingMimeTypeException
30
import org.memobase.fedora.FedoraClient
31
import org.memobase.fedora.FedoraClientImpl
32
33
import java.io.Closeable
import java.util.Properties
Jonas Waeber's avatar
Jonas Waeber committed
34

35
class Service(fileName: String = "app.yml") : Closeable {
36

37
38
    companion object {
        const val FEDORA_PROPERTIES_PREFIX = "fedora"
39
        const val CONSUMER_MAX_POLL_RECORDS = "10"
40
        const val CONSUMER_MAX_INTERVAL_MS = "3600000" // 3600000ms = 1h
41

42
43
44
        const val BINARY_FILE_URI_PATH = "binary"
        const val SFTP_PREFIX = "sftp:"

45
46
        fun createFedoraClient(appSettings: Properties): FedoraClient {
            return FedoraClientImpl.builder()
47
48
                .properties(appSettings, FEDORA_PROPERTIES_PREFIX)
                .build()
49
50
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
51

52
    private val settings = SettingsLoader(
53
54
55
56
57
58
59
60
61
62
63
        listOf(
            "isSimple",
            "$FEDORA_PROPERTIES_PREFIX.internalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.externalBaseUrl",
            "$FEDORA_PROPERTIES_PREFIX.username",
            "$FEDORA_PROPERTIES_PREFIX.password"
        ),
        fileName,
        useProducerConfig = true,
        useConsumerConfig = true,
        readSftpSettings = true
64
    )
65

66
    private val log: Logger = LogManager.getLogger("FedoraIngestService")
67
68
69
    private val fedoraClient = createFedoraClient(settings.appSettings)
    private val simpleIngester = SimpleIngester(fedoraClient)
    private val isSimple = (settings.appSettings.getProperty("isSimple") ?: "false").toBoolean()
70
71
    private var consumer: Consumer
    private var producer: Producer
72
    private var sftpClient: SftpClient? = null
Jonas Waeber's avatar
Jonas Waeber committed
73

74
    init {
75
76
        val consumerSettings = settings.kafkaConsumerSettings
        consumerSettings.setProperty("max.poll.records", CONSUMER_MAX_POLL_RECORDS)
77
        consumerSettings.setProperty("max.poll.interval.ms", CONSUMER_MAX_INTERVAL_MS)
78
        consumer = Consumer(consumerSettings, settings.inputTopic)
79
        producer = Producer(settings.kafkaProducerSettings, settings.processReportTopic)
80
81
82
83
84
        log.info("Connected to Kafka cluster.")
        if (!isSimple) {
            sftpClient = SftpClient(settings.sftpSettings)
            log.info("Connected to sFTP server.")
        }
85
    }
86

Jonas Waeber's avatar
Jonas Waeber committed
87
    fun run() {
88
89
90
91
92
        while (true) {
            processRecords()
        }
    }

93
    fun processRecords() {
94
        for (record in consumer.fetchRecords()) {
95
96
97
98
            val ingestReport = if (isSimple)
                processSingleEntity(record)
            else
                processRecord(record)
99
            producer.sendReport(record.headers(), ingestReport)
100
101
102
        }
    }

103
104
105
106
107
108
109
110
    /**
     * If there is only a single named entity within the content data and no binaries are loaded.
     */
    private fun processSingleEntity(record: ConsumerRecord<String, String>): Report {
        val key = record.key()
        val value = record.value()

        if (key == null)
111
            return Report("NoKey", ReportStatus.fatal, "The key in message is null.")
112
113

        if (value == null)
114
            return Report(key, ReportStatus.fatal, "The value in message is null.")
115
116
117
118
119
120
121
122
123

        return try {
            simpleIngester.ingest(key, value)
            Report(
                key, ReportStatus.success, "Successfully ingested message into Fedora repository."
            )
        } catch (ex: FcrepoOperationFailedException) {
            log.error(ex.localizedMessage)
            Report(
124
                key, ReportStatus.fatal, ex.localizedMessage
125
126
127
128
            )
        }
    }

129
    private fun processRecord(record: ConsumerRecord<String, String>): Report {
130
131
132
133
134
135
136
        val ingester = sftpClient?.let {
            Ingester(
                it,
                fedoraClient,
                settings.appSettings.getProperty("$FEDORA_PROPERTIES_PREFIX.externalBaseUrl")
            )
        }
137
138

        return try {
139
            ingester?.ingest(record.key(), record.value())
140
            Report(
141
142
143
                id = record.key(),
                status = ReportStatus.success,
                message = ReportMessages.ingestedRecord(record.key())
144
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
145
146
        } catch (ex: FcrepoOperationFailedException) {
            log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
147
148
            Report(
                id = record.key(),
149
                status = ReportStatus.fatal,
Jonas Waeber's avatar
Jonas Waeber committed
150
                message = "Fedora Exception: ${ex.localizedMessage}"
151
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
152
153
        } catch (ex: MissingMimeTypeException) {
            log.error("${ex.javaClass.canonicalName}:: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
154
155
            Report(
                id = record.key(),
156
                status = ReportStatus.fatal,
Jonas Waeber's avatar
Jonas Waeber committed
157
158
                message = "Missing MimeType Exception: ${ex.localizedMessage}"
            )
Thomas Bernhart's avatar
Thomas Bernhart committed
159
160
        } catch (ex: SftpClientException) {
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
Jonas Waeber's avatar
Jonas Waeber committed
161
162
            Report(
                id = record.key(),
163
                status = ReportStatus.fatal,
Jonas Waeber's avatar
Jonas Waeber committed
164
165
                message = "SFTP Exception: ${ex.localizedMessage}"
            )
166
        } catch (ex: Exception) {
Thomas Bernhart's avatar
Thomas Bernhart committed
167
            log.error("${ex.javaClass.canonicalName}: ${ex.localizedMessage}", ex)
168
            Report(
169
                id = record.key(),
170
                status = ReportStatus.fatal,
Jonas Waeber's avatar
Jonas Waeber committed
171
                message = "Unknown Exception: ${ex.localizedMessage}"
172
            )
173
174
        }
    }
175
176
177
178

    override fun close() {
        consumer.close()
        producer.close()
179
        sftpClient?.close()
180
    }
Jonas Waeber's avatar
Jonas Waeber committed
181
}