ElasticSearchWrapper.kt 7.65 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * search-doc-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18
19
package org.memobase.helpers

Jonas Waeber's avatar
Jonas Waeber committed
20
21
import java.net.SocketTimeoutException
import java.util.Properties
22
23
24
import org.apache.http.HttpHost
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
Jonas Waeber's avatar
Jonas Waeber committed
25
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
26
27
28
import org.elasticsearch.action.search.ClearScrollRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchScrollRequest
29
30
31
32
33
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.CountRequest
import org.elasticsearch.client.indices.GetIndexRequest
34
35
36
37
38
39
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders.termQuery
import org.elasticsearch.search.Scroll
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.memobase.model.FacetContainer

40

Jonas Waeber's avatar
Jonas Waeber committed
41
42
43
44
/**
 * This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve
 * the necessary data.
 */
45
46
47
48
49
class ElasticSearchWrapper(settings: Properties) {
    private val log = LogManager.getLogger("ElasticSearchWrapper")
    private val host = settings.getProperty(KEYS.SettingsProps.elasticHost)
    private val port = settings.getProperty(KEYS.SettingsProps.elasticPort).toInt()
    private val documentsIndex = settings.getProperty(KEYS.SettingsProps.elasticIndex)
50
51
    private val documentTypeMapper = DocumentTypeMapper(settings.getProperty(KEYS.SettingsProps.documentTypeLabelsPath))
    private var client: RestHighLevelClient? = null
52

Jonas Waeber's avatar
Jonas Waeber committed
53
54
55
    /**
     * Establishes a connection to the client and ensures, that the index is present. The index may be an alias
     * or an index directly.
Jonas Waeber's avatar
Jonas Waeber committed
56
57
     *
     * @return Elasticsearch Client
Jonas Waeber's avatar
Jonas Waeber committed
58
     */
Jonas Waeber's avatar
Jonas Waeber committed
59
    private fun connect(): RestHighLevelClient? {
60
61
        return try {
            val c = RestHighLevelClient(
62
63
64
65
66
67
                RestClient.builder(
                    HttpHost(host, port)
                )
            )
            val indexExists = c.indices().exists(GetIndexRequest(documentsIndex), RequestOptions.DEFAULT)
            val aliasExists = c.indices().existsAlias(GetAliasesRequest(documentsIndex), RequestOptions.DEFAULT)
Jonas Waeber's avatar
Jonas Waeber committed
68

69
            if (!indexExists && !aliasExists) {
Jonas Waeber's avatar
Jonas Waeber committed
70
                log.error("Could not find the index or alias defined in the configuration: $documentsIndex.")
71
72
73
74
75
76
77
78
                null
            } else {
                log.info("Successfully connected to index $documentsIndex. Ready to query.")
                c
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            null
79
80
81
        } catch (ex: SocketTimeoutException) {
            log.error(ex.localizedMessage)
            null
82
83
84
        }
    }

Jonas Waeber's avatar
Jonas Waeber committed
85
86
    /**
     * Counts the number of documents attached to a specific record set.
Jonas Waeber's avatar
Jonas Waeber committed
87
88
89
90
     *
     * @param recordSetIdentifier The memobase identifier of the record set.
     *
     * @return Number of Documents
Jonas Waeber's avatar
Jonas Waeber committed
91
     */
92
    fun countNumberOfDocuments(recordSetIdentifier: String): Int {
93
94
95
        if (client == null)
            connect()

96
97
98
99
100
101
102
        return client.let {
            if (it == null) {
                log.error("Not connected to an index. Count is at zero! Restart service to retry connection.")
                0
            } else {
                val request = CountRequest(documentsIndex)
                request.query(
103
104
105
                    termQuery(
                        "recordSet.facet", recordSetIdentifier
                    )
106
107
                )
                try {
108
109
                    val response = it.count(
                        request, RequestOptions.DEFAULT
110
111
112
113
114
115
116
117
118
                    )
                    response.count.toInt()
                } catch (ex: ElasticsearchException) {
                    log.error(ex.detailedMessage)
                    0
                }
            }
        }
    }
119
120

    /**
Jonas Waeber's avatar
Jonas Waeber committed
121
122
123
124
125
126
     * Collects all the document types present in the index for a specific record set.
     *
     * @param recordSetIdentifier: The memobase identifier of the record set.
     * @param queryField: Name of the field the identifier is stored in.
     *
     * @return A list of unique document type facet containers.
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
     */
    fun getDocumentTypesFromRecords(recordSetIdentifier: String, queryField: String): List<FacetContainer> {
        if (client == null)
            client = connect()

        return client.let {
            if (it == null) {
                log.error("Could not connect to elasticsearch. Try again.")
                emptyList()
            } else {
                try {
                    val resultFacets = mutableListOf<FacetContainer>()
                    val typeSet = mutableSetOf<String>()
                    val scroll = Scroll(TimeValue.timeValueMinutes(1L))
                    val searchRequest = SearchRequest(documentsIndex)
                    searchRequest.scroll(scroll)
                    val searchSourceBuilder = SearchSourceBuilder()
                    searchSourceBuilder.fetchSource(
                        arrayOf(
                            "id", "type"
                        ), emptyArray<String>()
                    )

                    searchSourceBuilder.query(
                        termQuery(
                            queryField, recordSetIdentifier
                        )
                    )
                    searchRequest.source(searchSourceBuilder)

                    var searchResponse = it.search(searchRequest, RequestOptions.DEFAULT)
                    var scrollId = searchResponse.scrollId
                    var searchHits = searchResponse.hits.hits

                    while (searchHits != null && searchHits.isNotEmpty()) {
                        val scrollRequest = SearchScrollRequest(scrollId)
                        scrollRequest.scroll(scroll)
                        searchResponse = it.scroll(scrollRequest, RequestOptions.DEFAULT)
                        scrollId = searchResponse.scrollId
                        searchHits = searchResponse.hits.hits
                        for (hit in searchHits) {
                            val type = hit.sourceAsMap["type"]
                            if (type != null) {
                                type as String
                                if (!typeSet.contains(type)) {
                                    resultFacets.add(documentTypeMapper.getValue(type))
                                    typeSet.add(type)
                                }
                            }
                        }
                    }
                    val clearScrollRequest = ClearScrollRequest()
                    clearScrollRequest.addScrollId(scrollId)
                    it.clearScroll(clearScrollRequest, RequestOptions.DEFAULT)
                    resultFacets
                } catch (ex: ElasticsearchException) {
                    log.error(ex.detailedMessage)
                    emptyList<FacetContainer>()
                }
            }
        }
    }
189
}