Commit 4e970c5e authored by Günter Hipler's avatar Günter Hipler
Browse files

basic wrapper for ES

head stands with dependencies conflicts
parent bc07a893
......@@ -15,10 +15,28 @@ lazy val root = (project in file("."))
assemblyJarName in assembly := "app.jar",
mainClass in assembly := Some("ch.memobase.rico2edm.Main"),
test in assembly := {},
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
cp filter { f =>
//f.data.getName.contains("spark-core") ||
f.data.getName == "jcl-over-slf4j-1.7.30.jar"
}
},
assemblyMergeStrategy in assembly := {
case "log4j.properties" => MergeStrategy.first
case "module-info.class" => MergeStrategy.discard
case "log4j2.xml" => MergeStrategy.first
/*
case PathList("org", "apache", "commons", "logging", "impl", "SimpleLog.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "impl", "SimpleLog$1.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "impl", "NoOpLog.class") => MergeStrategy.first
case PathList("apache", "commons", "logging", "LogFactory.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "LogFactory.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "LogConfigurationException.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "Log.class") => MergeStrategy.first
*/
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
......@@ -46,7 +64,8 @@ lazy val root = (project in file("."))
scalaTest % Test,
rdf4jmodel,
rioapi,
rioapixml
rioapixml,
elasticsearch
//rioapiturtle,
//rioapijsonld,
//rioapintriples
......
......@@ -25,6 +25,7 @@ object Dependencies {
lazy val scalatestV = "3.1.2"
lazy val jenaV = "3.17.0"
lazy val rdf4jV = "3.5.1"
val esVersion = "7.3.1"
//lazy val iiifApis = "de.digitalcollections.iiif" % "iiif-apis" % "0.3.8"
//lazy val iiifPresentationApi =
......@@ -48,6 +49,8 @@ object Dependencies {
lazy val rioapi = "org.eclipse.rdf4j" % "rdf4j-rio-api" % rdf4jV
lazy val rioapixml = "org.eclipse.rdf4j" % "rdf4j-rio-rdfxml" % rdf4jV
lazy val elasticsearch = "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % esVersion
//lazy val rioapiturtle = "org.eclipse.rdf4j" % "rdf4j-rio-turtle" % rdf4jV
//lazy val rioapijsonld = "org.eclipse.rdf4j" % "rdf4j-rio-jsonld" % rdf4jV
//lazy val rioapintriples = "org.eclipse.rdf4j" % "rdf4j-rio-ntriples" % rdf4jV
......
......@@ -4,12 +4,14 @@ app:
#accessTermLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/access_terms/labels.csv"
#reuseStatementLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/reuse_statements/labels.csv"
isocodemapping: ${ISOCODE_MAPPING:?system}
# elastic:
# host: ${ELASTIC_HOST:?system}
# port: ${ELASTIC_PORT:?system}
# documentsIndex: ${DOCUMENTS_INDEX:?system}
# institutionIndex: ${INSTITUTION_INDEX:?system}
# recordSetIndex: ${RECORD_SET_INDEX:?system}
elastic:
host: ${ELASTIC_HOST:?system}
port: ${ELASTIC_PORT:?system}
clustername: ${ELASTIC_CLUSTERNAME:?system}
#documentsIndex: ${DOCUMENTS_INDEX:?system}
institutionIndex: ${INSTITUTION_INDEX:?system}
recordSetIndex: ${RECORD_SET_INDEX:?system}
......
......@@ -20,7 +20,7 @@
package ch.memobase.rico2edm
import ch.memobase.rico2edm.utils.Helper
import ch.memobase.rico2edm.utils.{ElasticSearchClientWrapper, Helper, Keys}
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.scala.Logging
import org.memobase.settings.SettingsLoader
......@@ -41,7 +41,14 @@ object Main extends Logging {
"accessTermLabelsPath",
"reuseStatementLabelsPath"
).asJava,*/
List("isocodemapping").asJava,
List(
Keys.LANGUAGE_ISO_CODE,
Keys.ELASTIC_HOST,
Keys.ELASTIC_PORT,
Keys.INSTITUTION_INDEX,
Keys.RECORDSET_INDEX,
Keys.ELASTIC_CLUSTERNAME
).asJava,
"app.yml",
false,
true,
......@@ -60,7 +67,9 @@ object Main extends Logging {
)
val shutdownGracePeriodMs = 10000
Helper.initEnrichementMapping(settings.getAppSettings)
ElasticSearchClientWrapper(settings.getAppSettings)
logger.trace("Starting stream processing")
Try(
......
/*
* rico2edm
* Copyright (C) 2021 UB Basel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase.rico2edm.utils
import com.typesafe.scalalogging.Logger
import org.apache.http.{Header, HttpHost}
import org.apache.http.message.BasicHeader
import org.apache.logging.log4j.scala.Logging
import org.elasticsearch.client.{RestClient, RestHighLevelClient}
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
class ElasticSearchClientWrapper private (val client: RestHighLevelClient, val indices: Map[String,String]) {
}
object ElasticSearchClientWrapper extends Logging{
private var client: Option[ElasticSearchClientWrapper] = None
//not really functional - but fits our needs actually in the best way
def apply(props: Properties): Boolean =
connect(props) match {
case Success(esClient) =>
client = Option(new ElasticSearchClientWrapper(esClient,
Map(
Keys.RECORDSET_INDEX -> props.get(Keys.RECORDSET_INDEX).toString,
Keys.INSTITUTION_INDEX -> props.get(Keys.INSTITUTION_INDEX).toString
)
))
true
case Failure(exception) =>
logger.error(s"error initializing ${exception}")
false
}
private def connect(props: Properties) = Try {
val hosts = new ArrayBuffer[HttpHost]
val configuredHosts = props.get(Keys.ELASTIC_HOST).toString.split(";")
val configuredPort = props.get(Keys.ELASTIC_PORT).toString.toIntOption.getOrElse(8080)
configuredHosts.foreach(
value => {
hosts += new HttpHost(value, configuredPort)
}
)
val headers = Array(new BasicHeader("cluster.name", props.get(Keys.ELASTIC_CLUSTERNAME).toString)
.asInstanceOf[Header])
new RestHighLevelClient(RestClient.builder(hosts.toArray : _*).setDefaultHeaders(headers))
}
}
......@@ -23,5 +23,11 @@ package ch.memobase.rico2edm.utils
object Keys {
val LANGUAGE_ISO_CODE = "isocodemapping"
val INSTITUTION_INDEX = "elastic.institutionIndex"
val RECORDSET_INDEX = "elastic.recordSetIndex"
val ELASTIC_HOST = "elastic.host"
val ELASTIC_PORT = "elastic.port"
val ELASTIC_CLUSTERNAME = "elastic.clustername"
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment