In order to mitigate against the brute force attacks against Gitlab accounts, we are moving to all edu-ID Logins. We would like to remind you to link your account with your edu-id. Login will be possible only by edu-ID after November 30, 2021. Here you can find the instructions for linking your account.

If you don't have a SWITCH edu-ID, you can create one with this guide here

kind regards

This Server has been upgraded to GitLab release 14.2.6

ElasticSearchWrapper.kt 8.61 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * search-doc-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18
19
package org.memobase.helpers

20
21
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
22
23
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
Jonas Waeber's avatar
Jonas Waeber committed
24
import org.elasticsearch.action.get.GetRequest
25
26
27
import org.elasticsearch.action.search.ClearScrollRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchScrollRequest
28
29
30
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.CountRequest
31
32
33
34
35
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders.termQuery
import org.elasticsearch.search.Scroll
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.memobase.model.FacetContainer
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.memobase.model.LanguageContainer
37

38

Jonas Waeber's avatar
Jonas Waeber committed
39
40
41
42
/**
 * This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve
 * the necessary data.
 */
43
44
class ElasticSearchWrapper(
    private val client: RestHighLevelClient,
45
46
47
48
    private val translationMappers: TranslationMappers,
    private val documentsIndex: String,
    private val institutionIndex: String,
    private val recordSetIndex: String
49
) {
50
    private val log = LogManager.getLogger("ElasticSearchWrapper")
51
52
    private val klaxon = Klaxon()

Jonas Waeber's avatar
Jonas Waeber committed
53
54
    /**
     * Counts the number of documents attached to a specific record set.
Jonas Waeber's avatar
Jonas Waeber committed
55
56
57
58
     *
     * @param recordSetIdentifier The memobase identifier of the record set.
     *
     * @return Number of Documents
Jonas Waeber's avatar
Jonas Waeber committed
59
     */
60
    fun countNumberOfDocuments(recordSetIdentifier: String): Int {
61
        log.info("Counting documents for record set $recordSetIdentifier from index $documentsIndex.")
62
63
64
65
66
67
68
69
70
71
        val request = CountRequest(documentsIndex)
        request.query(
            termQuery(
                "recordSet.facet", recordSetIdentifier
            )
        )
        return try {
            val response = client.count(
                request, RequestOptions.DEFAULT
            )
Jonas Waeber's avatar
Jonas Waeber committed
72
            val count = response.count.toInt()
73
            log.info("Found $count documents for record set $recordSetIdentifier from index $documentsIndex.")
Jonas Waeber's avatar
Jonas Waeber committed
74
            count
75
76
77
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            0
78
79
        }
    }
80
81

    /**
Jonas Waeber's avatar
Jonas Waeber committed
82
83
84
85
86
87
     * Collects all the document types present in the index for a specific record set.
     *
     * @param recordSetIdentifier: The memobase identifier of the record set.
     * @param queryField: Name of the field the identifier is stored in.
     *
     * @return A list of unique document type facet containers.
88
89
     */
    fun getDocumentTypesFromRecords(recordSetIdentifier: String, queryField: String): List<FacetContainer> {
90
        return try {
91
            log.info("Attempting to load document type for $recordSetIdentifier in field $queryField from index $documentsIndex.")
92
93
94
95
96
97
98
99
100
101
102
            val resultFacets = mutableListOf<FacetContainer>()
            val typeSet = mutableSetOf<String>()
            val scroll = Scroll(TimeValue.timeValueMinutes(1L))
            val searchRequest = SearchRequest(documentsIndex)
            searchRequest.scroll(scroll)
            val searchSourceBuilder = SearchSourceBuilder()
            searchSourceBuilder.fetchSource(
                arrayOf(
                    "id", "type.filter"
                ), emptyArray<String>()
            )
103

104
105
106
107
108
109
110
111
112
            searchSourceBuilder.query(
                termQuery(
                    queryField, recordSetIdentifier
                )
            )
            searchRequest.source(searchSourceBuilder)
            var searchResponse = client.search(searchRequest, RequestOptions.DEFAULT)
            var scrollId = searchResponse.scrollId
            var searchHits = searchResponse.hits.hits
113

114
115
116
117
118
119
120
121
122
123
124
125
126
127
            while (searchHits != null && searchHits.isNotEmpty()) {
                val scrollRequest = SearchScrollRequest(scrollId)
                scrollRequest.scroll(scroll)
                searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT)
                scrollId = searchResponse.scrollId
                searchHits = searchResponse.hits.hits
                for (hit in searchHits) {
                    val source = hit.sourceAsString
                    try {
                        val document = klaxon.parse<DocumentResponseSource>(source)
                        if (document != null) {
                            if (!typeSet.contains(document.type.filter)) {
                                resultFacets.add(translationMappers.getDocumentType(document.type.filter))
                                typeSet.add(document.type.filter)
128
129
                            }
                        }
130
131
                    } catch (ex: KlaxonException) {
                        log.error("Unable to parse $source from index $documentsIndex.")
132
133
134
                    }
                }
            }
135
136
137
            val clearScrollRequest = ClearScrollRequest()
            clearScrollRequest.addScrollId(scrollId)
            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT)
138
            log.info("Found the following types $typeSet for record set $recordSetIdentifier from index $documentsIndex.")
139
140
141
142
            resultFacets
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            emptyList()
143
144
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
145
146
147

    fun getInstitutionName(identifier: String): FacetContainer {
        return try {
148
            log.debug("Attempting to retrieve institution record.")
Jonas Waeber's avatar
Jonas Waeber committed
149
150
151
            val request = GetRequest(institutionIndex, identifier)
            val response = client.get(request, RequestOptions.DEFAULT)
            if (response.isExists) {
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
                val map = response.sourceAsMap["name"]
                if (map == null) {
                    log.error("Found institution for id $identifier in index $institutionIndex, but could not extract name.")
                    FacetContainer(
                        LanguageContainer.EMPTY,
                        identifier,
                        emptyList()
                    )
                } else {
                    log.debug("Retrieval of institution names was successful.")
                    FacetContainer(
                        LanguageContainer.fromMap(map),
                        identifier,
                        emptyList()
                    )
                }
Jonas Waeber's avatar
Jonas Waeber committed
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
            } else {
                log.error("Could not find institution $identifier in index $institutionIndex.")
                FacetContainer(
                    LanguageContainer.EMPTY,
                    identifier,
                    emptyList()
                )
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            FacetContainer(
                LanguageContainer.EMPTY,
                identifier,
                emptyList()
            )
        }
    }
185
186
187

    fun getRecordSetName(identifier: String): LanguageContainer {
        return try {
188
            log.debug("Attempting to retrieve record set document from $recordSetIndex.")
189
190
191
            val request = GetRequest(recordSetIndex, identifier)
            val response = client.get(request, RequestOptions.DEFAULT)
            if (response.isExists) {
192
193
194
195
196
197
198
199
                val map = response.sourceAsMap["name"]
                if (map == null) {
                    log.error("Found record set for id $identifier in $recordSetIndex, but could not retrieve names in field 'name'.")
                    LanguageContainer.EMPTY
                } else {
                    log.debug("Retrieval of record set names was successful.")
                    LanguageContainer.fromMap(map)
                }
200
201
202
203
204
205
206
207
208
            } else {
                log.error("Could not find record set $identifier in index $recordSetIndex.")
                LanguageContainer.EMPTY
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            LanguageContainer.EMPTY
        }
    }
209
}