ElasticSearchWrapper.kt 10.3 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * search-doc-service
 * Copyright (C) 2020 Memoriav
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */
18 19
package org.memobase.helpers

20 21
import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
22 23
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
Jonas Waeber's avatar
Jonas Waeber committed
24
import org.elasticsearch.action.get.GetRequest
25 26 27
import org.elasticsearch.action.search.ClearScrollRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchScrollRequest
28 29 30
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.CountRequest
31 32 33 34 35
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders.termQuery
import org.elasticsearch.search.Scroll
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.memobase.model.FacetContainer
Jonas Waeber's avatar
Jonas Waeber committed
36
import org.memobase.model.LanguageContainer
37

38

Jonas Waeber's avatar
Jonas Waeber committed
39 40 41 42
/**
 * This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve
 * the necessary data.
 */
43 44
class ElasticSearchWrapper(
    private val client: RestHighLevelClient,
45 46 47 48
    private val translationMappers: TranslationMappers,
    private val documentsIndex: String,
    private val institutionIndex: String,
    private val recordSetIndex: String
49
) {
50
    private val log = LogManager.getLogger("ElasticSearchWrapper")
51 52
    private val klaxon = Klaxon()

Jonas Waeber's avatar
Jonas Waeber committed
53 54
    /**
     * Counts the number of documents attached to a specific record set.
Jonas Waeber's avatar
Jonas Waeber committed
55 56 57 58
     *
     * @param recordSetIdentifier The memobase identifier of the record set.
     *
     * @return Number of Documents
Jonas Waeber's avatar
Jonas Waeber committed
59
     */
60
    fun countNumberOfDocuments(recordSetIdentifier: String): Int {
61
        log.info("Counting documents for record set $recordSetIdentifier from index $documentsIndex.")
62 63 64 65 66 67 68 69 70 71
        val request = CountRequest(documentsIndex)
        request.query(
            termQuery(
                "recordSet.facet", recordSetIdentifier
            )
        )
        return try {
            val response = client.count(
                request, RequestOptions.DEFAULT
            )
Jonas Waeber's avatar
Jonas Waeber committed
72
            val count = response.count.toInt()
73
            log.info("Found $count documents for record set $recordSetIdentifier from index $documentsIndex.")
Jonas Waeber's avatar
Jonas Waeber committed
74
            count
75 76 77
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            0
78 79
        }
    }
80 81

    /**
Jonas Waeber's avatar
Jonas Waeber committed
82 83 84 85 86 87
     * Collects all the document types present in the index for a specific record set.
     *
     * @param recordSetIdentifier: The memobase identifier of the record set.
     * @param queryField: Name of the field the identifier is stored in.
     *
     * @return A list of unique document type facet containers.
88 89
     */
    fun getDocumentTypesFromRecords(recordSetIdentifier: String, queryField: String): List<FacetContainer> {
90
        return try {
91
            log.info("Attempting to load document type for $recordSetIdentifier in field $queryField from index $documentsIndex.")
92 93 94 95 96 97 98 99 100 101 102
            val resultFacets = mutableListOf<FacetContainer>()
            val typeSet = mutableSetOf<String>()
            val scroll = Scroll(TimeValue.timeValueMinutes(1L))
            val searchRequest = SearchRequest(documentsIndex)
            searchRequest.scroll(scroll)
            val searchSourceBuilder = SearchSourceBuilder()
            searchSourceBuilder.fetchSource(
                arrayOf(
                    "id", "type.filter"
                ), emptyArray<String>()
            )
103

104 105 106 107 108 109 110 111 112
            searchSourceBuilder.query(
                termQuery(
                    queryField, recordSetIdentifier
                )
            )
            searchRequest.source(searchSourceBuilder)
            var searchResponse = client.search(searchRequest, RequestOptions.DEFAULT)
            var scrollId = searchResponse.scrollId
            var searchHits = searchResponse.hits.hits
113

114 115 116 117 118 119 120 121 122
            while (searchHits != null && searchHits.isNotEmpty()) {
                for (hit in searchHits) {
                    val source = hit.sourceAsString
                    try {
                        val document = klaxon.parse<DocumentResponseSource>(source)
                        if (document != null) {
                            if (!typeSet.contains(document.type.filter)) {
                                resultFacets.add(translationMappers.getDocumentType(document.type.filter))
                                typeSet.add(document.type.filter)
123 124
                            }
                        }
125 126
                    } catch (ex: KlaxonException) {
                        log.error("Unable to parse $source from index $documentsIndex.")
127 128
                    }
                }
129 130 131 132 133
                val scrollRequest = SearchScrollRequest(scrollId)
                scrollRequest.scroll(scroll)
                searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT)
                scrollId = searchResponse.scrollId
                searchHits = searchResponse.hits.hits
134
            }
135 136 137
            val clearScrollRequest = ClearScrollRequest()
            clearScrollRequest.addScrollId(scrollId)
            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT)
138
            log.info("Found the following types $typeSet for record set $recordSetIdentifier from index $documentsIndex.")
139 140 141 142
            resultFacets
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            emptyList()
143 144
        }
    }
Jonas Waeber's avatar
Jonas Waeber committed
145 146 147

    fun getInstitutionName(identifier: String): FacetContainer {
        return try {
148
            log.debug("Attempting to retrieve institution record.")
Jonas Waeber's avatar
Jonas Waeber committed
149 150 151
            val request = GetRequest(institutionIndex, identifier)
            val response = client.get(request, RequestOptions.DEFAULT)
            if (response.isExists) {
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
                val map = response.sourceAsMap["name"]
                if (map == null) {
                    log.error("Found institution for id $identifier in index $institutionIndex, but could not extract name.")
                    FacetContainer(
                        LanguageContainer.EMPTY,
                        identifier,
                        emptyList()
                    )
                } else {
                    log.debug("Retrieval of institution names was successful.")
                    FacetContainer(
                        LanguageContainer.fromMap(map),
                        identifier,
                        emptyList()
                    )
                }
Jonas Waeber's avatar
Jonas Waeber committed
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
            } else {
                log.error("Could not find institution $identifier in index $institutionIndex.")
                FacetContainer(
                    LanguageContainer.EMPTY,
                    identifier,
                    emptyList()
                )
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            FacetContainer(
                LanguageContainer.EMPTY,
                identifier,
                emptyList()
            )
        }
    }
185 186 187

    fun getRecordSetName(identifier: String): LanguageContainer {
        return try {
188
            log.debug("Attempting to retrieve record set document from $recordSetIndex.")
189 190 191
            val request = GetRequest(recordSetIndex, identifier)
            val response = client.get(request, RequestOptions.DEFAULT)
            if (response.isExists) {
192 193 194 195 196 197 198 199
                val map = response.sourceAsMap["name"]
                if (map == null) {
                    log.error("Found record set for id $identifier in $recordSetIndex, but could not retrieve names in field 'name'.")
                    LanguageContainer.EMPTY
                } else {
                    log.debug("Retrieval of record set names was successful.")
                    LanguageContainer.fromMap(map)
                }
200 201 202 203 204 205 206 207 208
            } else {
                log.error("Could not find record set $identifier in index $recordSetIndex.")
                LanguageContainer.EMPTY
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            LanguageContainer.EMPTY
        }
    }
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242

    fun getExtraInstitutionsFromRecordSet(identifier: String, extraType: String): List<FacetContainer> {
        return try {
            log.debug("Attempting to retrieve record set document from $recordSetIndex.")
            val request = GetRequest(recordSetIndex, identifier)
            val response = client.get(request, RequestOptions.DEFAULT)
            if (response.isExists) {
                when (val map = response.sourceAsMap["${extraType}Institution"]) {
                    is List<*> -> {
                        log.debug("Retrieval of '${extraType}Institution' was successful.")
                        map.map { FacetContainer.fromMap(it) }
                    }
                    null -> {
                        log.error("Found record set for id $identifier in $recordSetIndex, but could not retrieve names in field '${extraType}Institution'.")
                        emptyList()
                    }
                    is Map<*, *> -> {
                        log.debug("Retrieval of '${extraType}Institution' was successful.")
                        listOf(FacetContainer.fromMap(map))
                    }
                    else -> {
                        log.error("Found record set for id $identifier in $recordSetIndex, but could not retrieve names in field '${extraType}Institution'.")
                        emptyList()
                    }
                }
            } else {
                log.error("Could not find record set $identifier in index $recordSetIndex.")
                emptyList()
            }
        } catch (ex: ElasticsearchException) {
            log.error(ex.detailedMessage)
            emptyList()
        }
    }
243
}