Commit 3541d049 authored by Silvia Witzig's avatar Silvia Witzig
Browse files

Merge branch 'master' of...

Merge branch 'master' of gitlab.switch.ch:memoriav/memobase-2020/services/postprocessing/rico-edm-transformer
parents e745638a e95fd176
Pipeline #24448 passed with stages
in 7 minutes and 13 seconds
...@@ -15,10 +15,28 @@ lazy val root = (project in file(".")) ...@@ -15,10 +15,28 @@ lazy val root = (project in file("."))
assemblyJarName in assembly := "app.jar", assemblyJarName in assembly := "app.jar",
mainClass in assembly := Some("ch.memobase.rico2edm.Main"), mainClass in assembly := Some("ch.memobase.rico2edm.Main"),
test in assembly := {}, test in assembly := {},
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
cp filter { f =>
//f.data.getName.contains("spark-core") ||
f.data.getName == "jcl-over-slf4j-1.7.30.jar"
}
},
assemblyMergeStrategy in assembly := { assemblyMergeStrategy in assembly := {
case "log4j.properties" => MergeStrategy.first case "log4j.properties" => MergeStrategy.first
case "module-info.class" => MergeStrategy.discard case "module-info.class" => MergeStrategy.discard
case "log4j2.xml" => MergeStrategy.first case "log4j2.xml" => MergeStrategy.first
/*
case PathList("org", "apache", "commons", "logging", "impl", "SimpleLog.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "impl", "SimpleLog$1.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "impl", "NoOpLog.class") => MergeStrategy.first
case PathList("apache", "commons", "logging", "LogFactory.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "LogFactory.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "LogConfigurationException.class") => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", "Log.class") => MergeStrategy.first
*/
case x => case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x) oldStrategy(x)
...@@ -46,7 +64,8 @@ lazy val root = (project in file(".")) ...@@ -46,7 +64,8 @@ lazy val root = (project in file("."))
scalaTest % Test, scalaTest % Test,
rdf4jmodel, rdf4jmodel,
rioapi, rioapi,
rioapixml rioapixml,
elasticsearch
//rioapiturtle, //rioapiturtle,
//rioapijsonld, //rioapijsonld,
//rioapintriples //rioapintriples
......
aag,47.38787196, 8.049455603
abe,46.95089, 7.43706
acj,47.41728, 7.07422
adg,46.8542, 9.5382
afz,47.375219, 8.545961
agl,47.04153, 9.06705
apf,46.5199, 6.6332
atd,46.186235, 8.732027
ati,46.192846, 9.0132
avl,46.52511, 6.62441
azh,47.39077, 8.512219
baa,45.86725, 8.98326
bab,47.551869, 7.589958
bar,46.940555555, 7.446388888
baz,47.37237, 8.54582
bbb,46.9473, 7.4483
bcf,46.8066, 7.15597
bfl,46.92833333, 7.45166667
bmf,47.5567059, 7.578635
bpu,46.9902, 6.9311
bvc,47.1042, 6.82634
cde,46.19309, 9.01245
cdt,46.19908, 6.13594
cag,46.1992, 6.1376
cic,46.2274, 6.1373
clg,46.87994, 8.64185
clu,47.10016, 6.8239
csa,46.60208, 6.53612
fad,46.44019, 8.93677
fer,46.1314399, 8.802462
fgr,46.85014, 9.53323
fpc,46.194709, 9.024187
fss,47.4958, 8.7383
gvs,46.88057, 8.64469
hgk,47.53307, 7.61098
hsl,47.07143, 8.27772
ias,46.52378, 6.58423
ikg,46.85111, 9.533846
ikr,46.22169, 6.12565
kak,47.3941882, 8.0587246
kbg,46.849522222, 9.533855555
kek,47.12683, 8.75293
khz,47.370278, 8.548056
klu,47.04138, 8.310911
kmm,47.06376, 7.09299
lfg,47.11458, 8.38545
lkb,46.94049, 7.44218
lmz,47.379166666, 8.539722222
maa,47.3944147, 8.0452584
mav,46.938154, 7.394621
mcl,46.0043, 8.95327
mdl,46.38, 6.24018
meg,46.197797222, 6.137313888
mel,46.509824, 6.632767
mem,46.1736, 8.81054
mfk,46.94186, 7.45004
mgb,46.6167, 7.058596
mgz,47.39077, 8.512219
mhl,46.5221, 6.63491
mov,46.94134, 7.436
mws,46.23168, 7.35853
raf,47.50045, 8.72527
rkk,47.37968, 8.52745
rra,47.37845, 8.52948
rti,46.00373, 8.9512
rtr,46.8520381, 9.5344336
rts,46.2, 6.083333333
rxb,47.53496, 7.59477
sap,46.949135, 7.436426111
sbb,47.47540096, 8.205857926
sik,47.362485, 8.555264
snb,46.941444, 7.449667
snp,46.00605, 8.9399
son,46.94814, 7.45241
soz,47.366827777, 8.547530555
srf,47.40146, 8.53547
sts,47.6973, 8.6337185
swi,46.943487611, 7.473449888
ubb,47.5594, 7.5812
vks,47.052777777, 8.335833333
zbz,47.374166666, 8.545277777
zem,46.96684, 7.45468
\ No newline at end of file
...@@ -16,7 +16,7 @@ Export des topic ...@@ -16,7 +16,7 @@ Export des topic
kafkacat -C -b mb-ka1:9092 -t fedora-output-json-records -K '\t' -o beginning | gzip > fedora-output1.json.gz kafkacat -C -b mb-ka1:9092 -t fedora-output-json-records -K '\t' -o beginning | gzip > fedora-output1.json.gz
Import des topic Import des topic
docker run --rm -v /home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/data:/data -it --network host edenhill/kafkacat:1.6.0 -P -b VPN:9092 -t fedora-output-json-records -K '\t' -l /data/fedora-output1.json docker run --rm -v /home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/data:/data -it --network host edenhill/kafkacat:1.6.0 -P -b VPN:9092 -t fedora-output-json-records1 -K '\t' -l /data/fedora-output-20210409.json
......
...@@ -25,6 +25,7 @@ object Dependencies { ...@@ -25,6 +25,7 @@ object Dependencies {
lazy val scalatestV = "3.1.2" lazy val scalatestV = "3.1.2"
lazy val jenaV = "3.17.0" lazy val jenaV = "3.17.0"
lazy val rdf4jV = "3.5.1" lazy val rdf4jV = "3.5.1"
val esVersion = "7.3.1"
//lazy val iiifApis = "de.digitalcollections.iiif" % "iiif-apis" % "0.3.8" //lazy val iiifApis = "de.digitalcollections.iiif" % "iiif-apis" % "0.3.8"
//lazy val iiifPresentationApi = //lazy val iiifPresentationApi =
...@@ -48,6 +49,8 @@ object Dependencies { ...@@ -48,6 +49,8 @@ object Dependencies {
lazy val rioapi = "org.eclipse.rdf4j" % "rdf4j-rio-api" % rdf4jV lazy val rioapi = "org.eclipse.rdf4j" % "rdf4j-rio-api" % rdf4jV
lazy val rioapixml = "org.eclipse.rdf4j" % "rdf4j-rio-rdfxml" % rdf4jV lazy val rioapixml = "org.eclipse.rdf4j" % "rdf4j-rio-rdfxml" % rdf4jV
lazy val elasticsearch = "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % esVersion
//lazy val rioapiturtle = "org.eclipse.rdf4j" % "rdf4j-rio-turtle" % rdf4jV //lazy val rioapiturtle = "org.eclipse.rdf4j" % "rdf4j-rio-turtle" % rdf4jV
//lazy val rioapijsonld = "org.eclipse.rdf4j" % "rdf4j-rio-jsonld" % rdf4jV //lazy val rioapijsonld = "org.eclipse.rdf4j" % "rdf4j-rio-jsonld" % rdf4jV
//lazy val rioapintriples = "org.eclipse.rdf4j" % "rdf4j-rio-ntriples" % rdf4jV //lazy val rioapintriples = "org.eclipse.rdf4j" % "rdf4j-rio-ntriples" % rdf4jV
......
...@@ -4,12 +4,15 @@ app: ...@@ -4,12 +4,15 @@ app:
#accessTermLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/access_terms/labels.csv" #accessTermLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/access_terms/labels.csv"
#reuseStatementLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/reuse_statements/labels.csv" #reuseStatementLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/reuse_statements/labels.csv"
isocodemapping: ${ISOCODE_MAPPING:?system} isocodemapping: ${ISOCODE_MAPPING:?system}
# elastic: institutionscoordinates: ${INSTITUTIONS_COORDINATES:?system}
# host: ${ELASTIC_HOST:?system} elastic:
# port: ${ELASTIC_PORT:?system} host: ${ELASTIC_HOST:?system}
# documentsIndex: ${DOCUMENTS_INDEX:?system} port: ${ELASTIC_PORT:?system}
# institutionIndex: ${INSTITUTION_INDEX:?system} clustername: ${ELASTIC_CLUSTERNAME:?system}
# recordSetIndex: ${RECORD_SET_INDEX:?system} #documentsIndex: ${DOCUMENTS_INDEX:?system}
institutionIndex: ${INSTITUTION_INDEX:?system}
recordSetIndex: ${RECORD_SET_INDEX:?system}
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
package ch.memobase.rico2edm package ch.memobase.rico2edm
import ch.memobase.rico2edm.utils.Helper import ch.memobase.rico2edm.utils.{ElasticSearchClientWrapper, Helper, Keys}
import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.scala.Logging import org.apache.logging.log4j.scala.Logging
import org.memobase.settings.SettingsLoader import org.memobase.settings.SettingsLoader
...@@ -41,7 +41,15 @@ object Main extends Logging { ...@@ -41,7 +41,15 @@ object Main extends Logging {
"accessTermLabelsPath", "accessTermLabelsPath",
"reuseStatementLabelsPath" "reuseStatementLabelsPath"
).asJava,*/ ).asJava,*/
List("isocodemapping").asJava, List(
Keys.INSTITUTIONS_COORDINATES_MAPPING,
Keys.LANGUAGE_ISO_CODE,
Keys.ELASTIC_HOST,
Keys.ELASTIC_PORT,
Keys.INSTITUTION_INDEX,
Keys.RECORDSET_INDEX,
Keys.ELASTIC_CLUSTERNAME
).asJava,
"app.yml", "app.yml",
false, false,
true, true,
...@@ -60,7 +68,10 @@ object Main extends Logging { ...@@ -60,7 +68,10 @@ object Main extends Logging {
) )
val shutdownGracePeriodMs = 10000 val shutdownGracePeriodMs = 10000
Helper.initEnrichementMapping(settings.getAppSettings)
Helper.initLanguageCodeMapping(settings.getAppSettings)
Helper.initInstitutionsCoordinateMapping(settings.getAppSettings)
ElasticSearchClientWrapper(settings.getAppSettings)
logger.trace("Starting stream processing") logger.trace("Starting stream processing")
Try( Try(
......
...@@ -23,7 +23,7 @@ package ch.memobase.rico2edm.edm ...@@ -23,7 +23,7 @@ package ch.memobase.rico2edm.edm
import ch.memobase.rico2edm.edm import ch.memobase.rico2edm.edm
import ch.memobase.rico2edm.edm.subjects.{Aggregation, Concept, ModelXMLTransformer, Place, ProvidedCHO, TimeSpan, WebResource} import ch.memobase.rico2edm.edm.subjects.{Aggregation, Concept, ModelXMLTransformer, Place, ProvidedCHO, TimeSpan, WebResource}
import ch.memobase.rico2edm.utils.Helper import ch.memobase.rico2edm.utils.{ElasticSearchClientWrapper, Helper}
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import scala.collection.mutable import scala.collection.mutable
...@@ -72,9 +72,9 @@ class EDM { ...@@ -72,9 +72,9 @@ class EDM {
val esObject = ModelXMLTransformer(model = choExtraction.obj.getModel, val esObject = ModelXMLTransformer(model = choExtraction.obj.getModel,
id = shortRecordId, id = shortRecordId,
recordset = Extractors.recordSetOrInstitution(record)("isPartOf") recordset = Extractors.recordSetOrInstitution(record.get)("isPartOf")
.map( ident => EDM.getInstitutionOrRecordsetIdent(ident)), .map( ident => EDM.getInstitutionOrRecordsetIdent(ident)),
institution = Extractors.recordSetOrInstitution(record)("heldBy") institution = Extractors.recordSetOrInstitution(record.get)("heldBy")
.map( identInstitution => EDM.getInstitutionOrRecordsetIdent(identInstitution)) .map( identInstitution => EDM.getInstitutionOrRecordsetIdent(identInstitution))
) )
...@@ -179,6 +179,27 @@ class EDM { ...@@ -179,6 +179,27 @@ class EDM {
Extractors.edmType(record.value) Extractors.edmType(record.value)
.foreach(c => cho.addEdmType(Some(c))) .foreach(c => cho.addEdmType(Some(c)))
Extractors
.recordSetOrInstitution(record.value)("heldBy")
.foreach(c => Helper.getInstitutionCoord(
EDM.getInstitutionOrRecordsetIdent(c)
).map(indexValue =>
cho.addCurrentLocation(Some(indexValue))))
Extractors
.recordSetOrInstitution(record.value)("isPartOf")
.foreach(c => ElasticSearchClientWrapper.getRecordsetName(
EDM.getInstitutionOrRecordsetIdent(c)
).map(indexValue =>
cho.addIsPartOf(Some(indexValue))))
/*
recordset = Extractors.recordSetOrInstitution(record.get)("isPartOf")
.map( ident => EDM.getInstitutionOrRecordsetIdent(ident)),
*/
ExtractionResult(cho) ExtractionResult(cho)
...@@ -272,6 +293,14 @@ class EDM { ...@@ -272,6 +293,14 @@ class EDM {
//fixed value for provider //fixed value for provider
aggregation.addProvider(Some("Memoriav")) aggregation.addProvider(Some("Memoriav"))
Extractors
.recordSetOrInstitution(record.value)("heldBy")
.foreach(c => ElasticSearchClientWrapper.getHeldBy(
EDM.getInstitutionOrRecordsetIdent(c)
).map(indexValue =>
aggregation.addDataProvider(Some(indexValue))))
Option(ExtractionResult(aggregation)) Option(ExtractionResult(aggregation))
} else { } else {
......
...@@ -111,13 +111,13 @@ object Extractors { ...@@ -111,13 +111,13 @@ object Extractors {
"https://www.ica.org/standards/RiC/ontology#Record" "https://www.ica.org/standards/RiC/ontology#Record"
) )
private val isHttpIdentifier = "^http.*".r private val isHttpIdentifier = "^http.*".r
val recordSetOrInstitution: Try[mutable.LinkedHashMap[String, JValue]] => String => List[String] = val recordSetOrInstitution: mutable.LinkedHashMap[String, JValue] => String => List[String] =
record => record =>
property => { property => {
val idents = if (stringValue(record.get)(property).isDefined) { val idents = if (stringValue(record)(property).isDefined) {
List(stringValue(record.get)(property).get) List(stringValue(record)(property).get)
} else if (arrayValue(record.get)(property).isDefined) { } else if (arrayValue(record)(property).isDefined) {
arrayValue(record.get)(property).get.map(_.str).toList arrayValue(record)(property).get.map(_.str).toList
} else { } else {
List.empty[String] List.empty[String]
} }
......
...@@ -42,7 +42,7 @@ object ModelXMLTransformer { ...@@ -42,7 +42,7 @@ object ModelXMLTransformer {
recordset: List[String], recordset: List[String],
institution: List[String], institution: List[String],
published: Boolean = true, published: Boolean = true,
format: String = "EDM", format: String = "edm",
): String = { ): String = {
val sOut = new StringWriter val sOut = new StringWriter
......
...@@ -95,6 +95,13 @@ class ProvidedCHO (val id: String) { ...@@ -95,6 +95,13 @@ class ProvidedCHO (val id: String) {
def addEdmType(edmtype:Option[String]): Unit = def addEdmType(edmtype:Option[String]): Unit =
edmtype.map(t => model.add(iri(id),EDMVocab.TYPE,factory.createLiteral(t))) edmtype.map(t => model.add(iri(id),EDMVocab.TYPE,factory.createLiteral(t)))
def addCurrentLocation(edmCurrentLocation:Option[(String,String)]): Unit =
edmCurrentLocation.map(t => model.add(iri(id),EDMVocab.CURRENT_LOCATION,
factory.createLiteral(s"${t._1} / ${t._2}")))
def addIsPartOf(dcTermsPartOf:Option[String]): Unit =
dcTermsPartOf.map(t => model.add(iri(id),DCTERMS.IS_PART_OF,factory.createLiteral(t)))
def getModel: Model = model def getModel: Model = model
...@@ -187,6 +194,12 @@ class Aggregation(private val id: String) { ...@@ -187,6 +194,12 @@ class Aggregation(private val id: String) {
} }
def addDataProvider(edmDataProvider: Option[String]): Unit = {
edmDataProvider.map(a =>
model.add(iri(id),EDMVocab.DATA_PROVIDER,factory.createLiteral(a))
)
}
def addEDMObjectNoFoto(edmObjectNoFoto: Option[String]): Unit = { def addEDMObjectNoFoto(edmObjectNoFoto: Option[String]): Unit = {
edmObjectNoFoto.map(a => { edmObjectNoFoto.map(a => {
if (identValue.matches(edmObjectNoFoto.get)) { if (identValue.matches(edmObjectNoFoto.get)) {
......
...@@ -45,6 +45,8 @@ object EDMVocab extends VocabularyFactory("http://www.europeana.eu/schemas/edm/" ...@@ -45,6 +45,8 @@ object EDMVocab extends VocabularyFactory("http://www.europeana.eu/schemas/edm/"
val IS_SHOWN_BY: IRI = getIri("isShownBy") val IS_SHOWN_BY: IRI = getIri("isShownBy")
val OBJECT: IRI = getIri("object") val OBJECT: IRI = getIri("object")
val PROVIDER: IRI = getIri("provider") val PROVIDER: IRI = getIri("provider")
val CURRENT_LOCATION: IRI = getIri("currentLocation")
val DATA_PROVIDER: IRI = getIri("dataProvider")
} }
......
/*
* rico2edm
* Copyright (C) 2021 UB Basel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase.rico2edm.utils
import com.typesafe.scalalogging.Logger
import org.apache.http.{Header, HttpHost}
import org.apache.http.message.BasicHeader
import org.apache.logging.log4j.scala.Logging
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import java.util
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import scala.jdk.CollectionConverters._
import java.util.{ArrayList => JArrayList}
import java.util.{HashMap => JHashMap}
class ElasticSearchClientWrapper private (val client: RestHighLevelClient, val indices: Map[String,String]) {
type ESStringList = JHashMap[String,JArrayList[String]]
private def getInstitutionNameById (id: String) = Try {
val getRequest = new GetRequest(indices.getOrElse(Keys.INSTITUTION_INDEX,"institutions-v4"),id)
val getResponse = client.get(getRequest, RequestOptions.DEFAULT)
if (getResponse.isSourceEmpty) {
throw new Exception(s"institution with identifier $id not found ")
} else {
val hit = getResponse.getSource.asScala
val institutionName = hit.getOrElse("name", new ESStringList()).asInstanceOf[ESStringList].asScala
institutionName.getOrElse("de",new JArrayList[String]{"unknown"}).get(0)
}
}
private def getRecordSetName (recordSetId: String) = Try {
val getRequest = new GetRequest(indices.getOrElse(Keys.RECORDSET_INDEX,"record-sets-v4"),recordSetId)
val getResponse = client.get(getRequest, RequestOptions.DEFAULT)
if (getResponse.isSourceEmpty) {
throw new Exception(s"recordset with identifier $recordSetId not found ")
} else {
val hit = getResponse.getSource.asScala
val institutionName = hit.getOrElse("name", new ESStringList()).asInstanceOf[ESStringList].asScala
institutionName.getOrElse("de",new JArrayList[String]{"unknown"}).get(0)
}
}
}
object ElasticSearchClientWrapper extends Logging{
private var client: Option[ElasticSearchClientWrapper] = None
private val identValue = "^https.*".r
//not really functional - but fits our needs actually in the best way
def apply(props: Properties): Boolean =
connect(props) match {
case Success(esClient) =>
client = Option(new ElasticSearchClientWrapper(esClient,
Map(
Keys.RECORDSET_INDEX -> props.get(Keys.RECORDSET_INDEX).toString,
Keys.INSTITUTION_INDEX -> props.get(Keys.INSTITUTION_INDEX).toString
)
))
true
case Failure(exception) =>
logger.error(s"error initializing ${exception}")
false
}
def getHeldBy(id: String): Option[String] = {
if (client.isDefined) {
client.get.getInstitutionNameById(id) match {
case Success(instTitle) => Some(instTitle)
case Failure(exception) =>
logger.error(s"error trying to get institution title: $exception")
None
}
} else {
Option.empty
}
}
def getRecordsetName(idRecordSet: String): Option[String] = {
if (client.isDefined) {
client.get.getRecordSetName(idRecordSet) match {
case Success(instTitle) => Some(instTitle)
case Failure(exception) =>
logger.error(s"error trying to get recordset title: $exception")
None
}
} else {
Option.empty
}
}
private def connect(props: Properties) = Try {
val hosts = new ArrayBuffer[HttpHost]
val configuredHosts = props.get(Keys.ELASTIC_HOST).toString.split(";")
val configuredPort = props.get(Keys.ELASTIC_PORT).toString.toIntOption.getOrElse(8080)
configuredHosts.foreach(
value => {
hosts += new HttpHost(value, configuredPort)
}
)
val headers = Array(new BasicHeader("cluster.name", props.get(Keys.ELASTIC_CLUSTERNAME).toString)
.asInstanceOf[Header])
new RestHighLevelClient(RestClient.builder(hosts.toArray : _*).setDefaultHeaders(headers))
}
}
...@@ -30,6 +30,10 @@ object Helper { ...@@ -30,6 +30,10 @@ object Helper {
private var isoLanguageCodes: Option[JHashMap[String,String]] = None private var isoLanguageCodes: Option[JHashMap[String,String]] = None
private var institutionsCoordinates: Option[JHashMap[String,(String,String)]] = None
def compress(data: Array[Byte]): Array[Byte] = { def compress(data: Array[Byte]): Array[Byte] = {
val deflater = new Deflater() val deflater = new Deflater()
deflater.setInput(data)