In order to mitigate against the brute force attacks against Gitlab accounts, we are moving to all edu-ID Logins. We would like to remind you to link your account with your edu-id. Login will be possible only by edu-ID after November 30, 2021. Here you can find the instructions for linking your account.

If you don't have a SWITCH edu-ID, you can create one with this guide here

kind regards

ElasticSearchWrapper.kt 7.15 KB
Newer Older
1
2
3
4
5
package org.memobase.helpers

import org.apache.http.HttpHost
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
Jonas Waeber's avatar
Jonas Waeber committed
6
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
7
8
9
import org.elasticsearch.action.search.ClearScrollRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchScrollRequest
10
11
12
13
14
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.CountRequest
import org.elasticsearch.client.indices.GetIndexRequest
15
import org.elasticsearch.common.unit.TimeValue
16
import org.elasticsearch.index.query.QueryBuilders
17
18
19
20
21
22
23
24
import org.elasticsearch.index.query.QueryBuilders.termQuery
import org.elasticsearch.search.Scroll
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.memobase.model.FacetContainer
import java.net.SocketTimeoutException
import java.util.*
import org.elasticsearch.search.sort.SortBuilders

25

Jonas Waeber's avatar
Jonas Waeber committed
26
27
28
29
/**
 * This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve
 * the necessary data.
 */
30
31
32
33
34
class ElasticSearchWrapper(settings: Properties) {
    private val log = LogManager.getLogger("ElasticSearchWrapper")
    private val host = settings.getProperty(KEYS.SettingsProps.elasticHost)
    private val port = settings.getProperty(KEYS.SettingsProps.elasticPort).toInt()
    private val documentsIndex = settings.getProperty(KEYS.SettingsProps.elasticIndex)
35
36
    private val documentTypeMapper = DocumentTypeMapper(settings.getProperty(KEYS.SettingsProps.documentTypeLabelsPath))
    private var client: RestHighLevelClient? = null
37

Jonas Waeber's avatar
Jonas Waeber committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    /**
     * Establishes a connection to the client and ensures, that the index is present. The index may be an alias
     * or an index directly.
     *
     * If the connection fails or the index does not exists, no client is created. When this happens all
     * functions will be returning default values instead of executing the query.
     *
     * The service needs to be restarted to establish a new connection. It is reported within the logs that
     * there is no successful connection.
     *
     * This makes it easier to test this service, and ensures that it keeps running even if elasticsearch is not
     * available.
     *
     * TODO: In the future find a better solution.
     */
53
    fun connect(): RestHighLevelClient? {
54
55
        return try {
            val c = RestHighLevelClient(
56
57
58
59
60
61
                RestClient.builder(
                    HttpHost(host, port)
                )
            )
            val indexExists = c.indices().exists(GetIndexRequest(documentsIndex), RequestOptions.DEFAULT)
            val aliasExists = c.indices().existsAlias(GetAliasesRequest(documentsIndex), RequestOptions.DEFAULT)
Jonas Waeber's avatar
Jonas Waeber committed
62

63
            if (!indexExists && !aliasExists) {
Jonas Waeber's avatar
Jonas Waeber committed
64
                log.error("Could not find the index or alias defined in the configuration: $documentsIndex.")
65
66
67
68
69
70
71
72
                null
            } else {
                log.info("Successfully connected to index $documentsIndex. Ready to query.")
                c
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            null
73
74
75
        } catch (ex: SocketTimeoutException) {
            log.error(ex.localizedMessage)
            null
76
77
78
        }
    }

Jonas Waeber's avatar
Jonas Waeber committed
79
80
81
    /**
     * Counts the number of documents attached to a specific record set.
     */
82
    fun countNumberOfDocuments(recordSetIdentifier: String): Int {
83
84
85
        if (client == null)
            connect()

86
87
88
89
90
91
92
        return client.let {
            if (it == null) {
                log.error("Not connected to an index. Count is at zero! Restart service to retry connection.")
                0
            } else {
                val request = CountRequest(documentsIndex)
                request.query(
93
94
95
                    termQuery(
                        "recordSet.facet", recordSetIdentifier
                    )
96
97
                )
                try {
98
99
                    val response = it.count(
                        request, RequestOptions.DEFAULT
100
101
102
103
104
105
106
107
108
                    )
                    response.count.toInt()
                } catch (ex: ElasticsearchException) {
                    log.error(ex.detailedMessage)
                    0
                }
            }
        }
    }
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173

    /**
     * Counts the number of documents attached to a specific record set.
     */
    fun getDocumentTypesFromRecords(recordSetIdentifier: String, queryField: String): List<FacetContainer> {
        if (client == null)
            client = connect()

        return client.let {
            if (it == null) {
                log.error("Could not connect to elasticsearch. Try again.")
                emptyList()
            } else {
                try {
                    val resultFacets = mutableListOf<FacetContainer>()
                    val typeSet = mutableSetOf<String>()
                    val scroll = Scroll(TimeValue.timeValueMinutes(1L))
                    val searchRequest = SearchRequest(documentsIndex)
                    searchRequest.scroll(scroll)
                    val searchSourceBuilder = SearchSourceBuilder()
                    searchSourceBuilder.fetchSource(
                        arrayOf(
                            "id", "type"
                        ), emptyArray<String>()
                    )

                    searchSourceBuilder.query(
                        termQuery(
                            queryField, recordSetIdentifier
                        )
                    )
                    searchRequest.source(searchSourceBuilder)

                    var searchResponse = it.search(searchRequest, RequestOptions.DEFAULT)
                    var scrollId = searchResponse.scrollId
                    var searchHits = searchResponse.hits.hits

                    while (searchHits != null && searchHits.isNotEmpty()) {
                        val scrollRequest = SearchScrollRequest(scrollId)
                        scrollRequest.scroll(scroll)
                        searchResponse = it.scroll(scrollRequest, RequestOptions.DEFAULT)
                        scrollId = searchResponse.scrollId
                        searchHits = searchResponse.hits.hits
                        for (hit in searchHits) {
                            val type = hit.sourceAsMap["type"]
                            if (type != null) {
                                type as String
                                if (!typeSet.contains(type)) {
                                    resultFacets.add(documentTypeMapper.getValue(type))
                                    typeSet.add(type)
                                }
                            }
                        }
                    }
                    val clearScrollRequest = ClearScrollRequest()
                    clearScrollRequest.addScrollId(scrollId)
                    it.clearScroll(clearScrollRequest, RequestOptions.DEFAULT)
                    resultFacets
                } catch (ex: ElasticsearchException) {
                    log.error(ex.detailedMessage)
                    emptyList<FacetContainer>()
                }
            }
        }
    }
174
}