ElasticSearchWrapper.kt 6.94 KB
Newer Older
1
2
package org.memobase.helpers

Jonas Waeber's avatar
Jonas Waeber committed
3
4
import java.net.SocketTimeoutException
import java.util.Properties
5
6
7
import org.apache.http.HttpHost
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
Jonas Waeber's avatar
Jonas Waeber committed
8
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
9
10
11
import org.elasticsearch.action.search.ClearScrollRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchScrollRequest
12
13
14
15
16
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.CountRequest
import org.elasticsearch.client.indices.GetIndexRequest
17
18
19
20
21
22
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders.termQuery
import org.elasticsearch.search.Scroll
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.memobase.model.FacetContainer

23

Jonas Waeber's avatar
Jonas Waeber committed
24
25
26
27
/**
 * This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve
 * the necessary data.
 */
28
29
30
31
32
class ElasticSearchWrapper(settings: Properties) {
    private val log = LogManager.getLogger("ElasticSearchWrapper")
    private val host = settings.getProperty(KEYS.SettingsProps.elasticHost)
    private val port = settings.getProperty(KEYS.SettingsProps.elasticPort).toInt()
    private val documentsIndex = settings.getProperty(KEYS.SettingsProps.elasticIndex)
33
34
    private val documentTypeMapper = DocumentTypeMapper(settings.getProperty(KEYS.SettingsProps.documentTypeLabelsPath))
    private var client: RestHighLevelClient? = null
35

Jonas Waeber's avatar
Jonas Waeber committed
36
37
38
    /**
     * Establishes a connection to the client and ensures, that the index is present. The index may be an alias
     * or an index directly.
Jonas Waeber's avatar
Jonas Waeber committed
39
40
     *
     * @return Elasticsearch Client
Jonas Waeber's avatar
Jonas Waeber committed
41
     */
Jonas Waeber's avatar
Jonas Waeber committed
42
    private fun connect(): RestHighLevelClient? {
43
44
        return try {
            val c = RestHighLevelClient(
45
46
47
48
49
50
                RestClient.builder(
                    HttpHost(host, port)
                )
            )
            val indexExists = c.indices().exists(GetIndexRequest(documentsIndex), RequestOptions.DEFAULT)
            val aliasExists = c.indices().existsAlias(GetAliasesRequest(documentsIndex), RequestOptions.DEFAULT)
Jonas Waeber's avatar
Jonas Waeber committed
51

52
            if (!indexExists && !aliasExists) {
Jonas Waeber's avatar
Jonas Waeber committed
53
                log.error("Could not find the index or alias defined in the configuration: $documentsIndex.")
54
55
56
57
58
59
60
61
                null
            } else {
                log.info("Successfully connected to index $documentsIndex. Ready to query.")
                c
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            null
62
63
64
        } catch (ex: SocketTimeoutException) {
            log.error(ex.localizedMessage)
            null
65
66
67
        }
    }

Jonas Waeber's avatar
Jonas Waeber committed
68
69
    /**
     * Counts the number of documents attached to a specific record set.
Jonas Waeber's avatar
Jonas Waeber committed
70
71
72
73
     *
     * @param recordSetIdentifier The memobase identifier of the record set.
     *
     * @return Number of Documents
Jonas Waeber's avatar
Jonas Waeber committed
74
     */
75
    fun countNumberOfDocuments(recordSetIdentifier: String): Int {
76
77
78
        if (client == null)
            connect()

79
80
81
82
83
84
85
        return client.let {
            if (it == null) {
                log.error("Not connected to an index. Count is at zero! Restart service to retry connection.")
                0
            } else {
                val request = CountRequest(documentsIndex)
                request.query(
86
87
88
                    termQuery(
                        "recordSet.facet", recordSetIdentifier
                    )
89
90
                )
                try {
91
92
                    val response = it.count(
                        request, RequestOptions.DEFAULT
93
94
95
96
97
98
99
100
101
                    )
                    response.count.toInt()
                } catch (ex: ElasticsearchException) {
                    log.error(ex.detailedMessage)
                    0
                }
            }
        }
    }
102
103

    /**
Jonas Waeber's avatar
Jonas Waeber committed
104
105
106
107
108
109
     * Collects all the document types present in the index for a specific record set.
     *
     * @param recordSetIdentifier: The memobase identifier of the record set.
     * @param queryField: Name of the field the identifier is stored in.
     *
     * @return A list of unique document type facet containers.
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
     */
    fun getDocumentTypesFromRecords(recordSetIdentifier: String, queryField: String): List<FacetContainer> {
        if (client == null)
            client = connect()

        return client.let {
            if (it == null) {
                log.error("Could not connect to elasticsearch. Try again.")
                emptyList()
            } else {
                try {
                    val resultFacets = mutableListOf<FacetContainer>()
                    val typeSet = mutableSetOf<String>()
                    val scroll = Scroll(TimeValue.timeValueMinutes(1L))
                    val searchRequest = SearchRequest(documentsIndex)
                    searchRequest.scroll(scroll)
                    val searchSourceBuilder = SearchSourceBuilder()
                    searchSourceBuilder.fetchSource(
                        arrayOf(
                            "id", "type"
                        ), emptyArray<String>()
                    )

                    searchSourceBuilder.query(
                        termQuery(
                            queryField, recordSetIdentifier
                        )
                    )
                    searchRequest.source(searchSourceBuilder)

                    var searchResponse = it.search(searchRequest, RequestOptions.DEFAULT)
                    var scrollId = searchResponse.scrollId
                    var searchHits = searchResponse.hits.hits

                    while (searchHits != null && searchHits.isNotEmpty()) {
                        val scrollRequest = SearchScrollRequest(scrollId)
                        scrollRequest.scroll(scroll)
                        searchResponse = it.scroll(scrollRequest, RequestOptions.DEFAULT)
                        scrollId = searchResponse.scrollId
                        searchHits = searchResponse.hits.hits
                        for (hit in searchHits) {
                            val type = hit.sourceAsMap["type"]
                            if (type != null) {
                                type as String
                                if (!typeSet.contains(type)) {
                                    resultFacets.add(documentTypeMapper.getValue(type))
                                    typeSet.add(type)
                                }
                            }
                        }
                    }
                    val clearScrollRequest = ClearScrollRequest()
                    clearScrollRequest.addScrollId(scrollId)
                    it.clearScroll(clearScrollRequest, RequestOptions.DEFAULT)
                    resultFacets
                } catch (ex: ElasticsearchException) {
                    log.error(ex.detailedMessage)
                    emptyList<FacetContainer>()
                }
            }
        }
    }
172
}