ElasticsearchComponent.scala 7.12 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * generic OAI Server - agnostic in relation to the data repository
 * initially created for memobase project
 *
 * Copyright (C) 2021  UB Basel
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 */


23
24
package modules

25
import java.util
26
27
import com.typesafe.config.Config
import org.elasticsearch.action.get.GetRequest
28
import org.elasticsearch.action.search.{SearchRequest, SearchResponse, SearchScrollRequest}
29
import org.elasticsearch.client.{RequestOptions, RestHighLevelClient}
30
import org.elasticsearch.common.unit.TimeValue
31
import org.elasticsearch.index.query.QueryBuilders
32
import org.elasticsearch.search.builder.SearchSourceBuilder
33
import org.swissbib.memobase.oai.common.util.{ESResumptionTokenHelper, ResumptionToken}
34
import org.swissbib.memobase.oai.runner.{GetRecordFailure, ResultList}
35
import play.Environment
36
import scala.jdk.CollectionConverters._
37
import scala.util.{Failure, Success, Try}
38
39
40

trait ElasticsearchComponent extends OaiRepository {
  val client: Option[RestHighLevelClient]
41
42
43
44
45
46

  val config: Config
  val env: Environment
  //to do: another solution necessary in case  we decide to use several indices for OAI
  val index: String

47
48
49
50
  override def listRecords(from: Option[String],
                           until: Option[String],
                           set: Option[String],
                           resumptionToken: Option[ResumptionToken],
51
                           metadataPrefix: String): Try[ResultList] = Try{
52
53
54
55
56
    //todo - more / better / tested handling for different parameters

    val searchResponse:Try[SearchResponse] = (from, until, set, resumptionToken, metadataPrefix) match {
      case (Some(from), Some(until),_,None,_) =>

57
58
59
60
        val rqB = QueryBuilders
          .rangeQuery("lastUpdatedDate")
          .gte(from).format("strict_date_time")
          .lte(until).format("strict_date_time")
61
62


63
64
        val searchSourceBuilder = new SearchSourceBuilder().query(rqB).size(30)
        val searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index).scroll(TimeValue.timeValueMinutes(3L))
65

66
        Try {client.get.search(searchRequest, RequestOptions.DEFAULT)}
67

68
69
70
71
72
      case (Some(from), None ,_,None,_) =>
        //for the moment
        val rqB = QueryBuilders
          .rangeQuery("lastUpdatedDate")
          .gte(from).format("strict_date_time")
73
        val searchSourceBuilder = new SearchSourceBuilder().query(rqB).size(30)
74
        val searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index).scroll(TimeValue.timeValueMinutes(3L))
75

76
        Try {client.get.search(searchRequest, RequestOptions.DEFAULT)}
77

78
      case (None, Some(until) ,_,None,_) =>
79
        //for the moment
80
81
82
        val rqB = QueryBuilders
          .rangeQuery("lastUpdatedDate")
          .lte(until).format("strict_date_time")
83
        val searchSourceBuilder = new SearchSourceBuilder().query(rqB).size(30)
84
85
        val searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index).scroll(TimeValue.timeValueMinutes(3L))

86
87
        Try {client.get.search(searchRequest, RequestOptions.DEFAULT)}

88
89

      case (_, _,_,Some(resumptionToken),_) =>
90
91
92
        val scrollRequest = new SearchScrollRequest(resumptionToken.subject)
        scrollRequest.scroll(TimeValue.timeValueMinutes(3L))
        Try[SearchResponse] {client.get.scroll(scrollRequest, RequestOptions.DEFAULT)}
93
94
95
      //only for the moment until we have better data
      case (_, _,_,_,_) =>
        //for the moment
96
        val searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(30)
97
98
        val searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index).scroll(TimeValue.timeValueMinutes(3L))

99
        Try {client.get.search(searchRequest, RequestOptions.DEFAULT)}
100
101
102
103
104
105
106
107
108
109
110

    }


    searchResponse match  {
      case Success(searchResponse) =>

        //scroll_id is always the same -> for this specific context
        //condition to finish the fetching is to compare the length of the resultlist
        //compare: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-search-scrolling.html
        //otherwise we will get into an empty loop
111
        val scrollId = if (searchResponse.getHits.getHits.length < 30) None else Option(searchResponse.getScrollId)
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
        val resumptionToken: Option[ResumptionToken] = scrollId.map(
          ESResumptionTokenHelper(_))

        val contentList = searchResponse.getHits.getHits.
          map(hit => {
            val source = hit.getSourceAsMap.asScala
            OAIContent(hit.getId,
              source.getOrElse("id","").toString,
              source.getOrElse("document","").toString,
              source.getOrElse("format","").toString,
              source.getOrElse("published",false).asInstanceOf[Boolean],
              source.getOrElse("recordset",new util.ArrayList[String]()).asInstanceOf[util.ArrayList[String]].asScala.toList,
              source.getOrElse("institution",new util.ArrayList[String]()).asInstanceOf[util.ArrayList[String]].asScala.toList,
              source.getOrElse("lastUpdatedDate", "").asInstanceOf[String])

          }
          ).toSeq


131
132
        ResultList(resumptionToken, contentList)
      case Failure(exception) => throw exception
133
134
135
136
137
    }

  }


138

139
140
141
142
  override def listIdentiers(from: Option[String],
                             until: Option[String],
                             set: Option[String],
                             resumptionToken: Option[ResumptionToken],
143
                             metadataPrefix: String): Try[ResultList] =
Günter Hipler's avatar
Günter Hipler committed
144
145
146
147
148
149
    listRecords(from,
                until,
                set,
                resumptionToken,
                metadataPrefix)

150

151
  override def getRecord(identifier: String,
152
                         metadataPrefix: String): Try[OAIContent] = Try {
153
154
155
156

    val getRequest = new GetRequest(index, identifier)
    val getResponse = client.get.get(getRequest, RequestOptions.DEFAULT)

157
    if (getResponse.isSourceEmpty) {
158
159
160
161
162
163
164
165
166
167
168
169
      throw GetRecordFailure("record was not found")
    } else {
      val hit = getResponse.getSource.asScala
      getResponse.getId
      OAIContent(getResponse.getId,
        hit.getOrElse("id", "").toString,
        hit.getOrElse("document", "").toString,
        hit.getOrElse("format", "").toString,
        hit.getOrElse("published", false).asInstanceOf[Boolean],
        hit.getOrElse("recordset", new util.ArrayList[String]()).asInstanceOf[util.ArrayList[String]].asScala.toList,
        hit.getOrElse("institution", new util.ArrayList[String]()).asInstanceOf[util.ArrayList[String]].asScala.toList,
        hit.getOrElse("lastUpdatedDate", "").asInstanceOf[String]
170
171
      )

Günter Hipler's avatar
Günter Hipler committed
172

173
    }
Günter Hipler's avatar
Günter Hipler committed
174

175
  }
176
}