Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit 20c2cbc5 authored by Günter Hipler's avatar Günter Hipler
Browse files

included rules to define which datasets or institutions are allowed for

OAI export
parent 3f9dd3ad
Pipeline #25564 passed with stages
in 8 minutes and 59 seconds
{
"isHeldBy": [],
"isPartOf": ["apf-001","apf-002"]
}
\ No newline at end of file
...@@ -5,6 +5,7 @@ app: ...@@ -5,6 +5,7 @@ app:
#reuseStatementLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/reuse_statements/labels.csv" #reuseStatementLabelsPath: "/home/swissbib/environment/code/repositories/memoriav/gitlab/services/postprocessing/rico-edm-transformer/configs/reuse_statements/labels.csv"
isocodemapping: ${ISOCODE_MAPPING:?system} isocodemapping: ${ISOCODE_MAPPING:?system}
institutionscoordinates: ${INSTITUTIONS_COORDINATES:?system} institutionscoordinates: ${INSTITUTIONS_COORDINATES:?system}
exportDefinitionRules: ${EXPORT_DEFINITION_RULES:?system}
elastic: elastic:
host: ${ELASTIC_HOST:?system} host: ${ELASTIC_HOST:?system}
port: ${ELASTIC_PORT:?system} port: ${ELASTIC_PORT:?system}
......
...@@ -44,17 +44,20 @@ class KafkaTopology extends Logging { ...@@ -44,17 +44,20 @@ class KafkaTopology extends Logging {
reportingTopic: String, reportingTopic: String,
appSettings: Properties appSettings: Properties
): Topology = { ): Topology = {
val builder = new StreamsBuilder val builder = new StreamsBuilder
val edmBuilder = new EDM val edmBuilder = new EDM
val source = builder.stream[String, String](topicIn) val source = builder.stream[String, String](topicIn)
//val Array(noDigitalObject, noLocator, noPhoto, isPhoto) = source //val Array(noDigitalObject, noLocator, noPhoto, isPhoto) = source
//we have to discuss, which documents should be delivered to Europeana //we have to discuss, which documents should be delivered to Europeana
val Array(noDigitalObject, noLocator, isEDMDeliverable) = source val Array(noOaiExport, isEDMDeliverable) = source
.branch( .branch(
(_, v) => hasNoDigitalObject(v), (_, v) => notDefinedForOaiExport(v),
(_, v) => hasNoLocator(v),
(_, _) => true (_, _) => true
) )
...@@ -75,12 +78,13 @@ class KafkaTopology extends Logging { ...@@ -75,12 +78,13 @@ class KafkaTopology extends Logging {
reportEDMCreationFailure(noEDM, reportingTopic) reportEDMCreationFailure(noEDM, reportingTopic)
reportIgnoredRecord( reportIgnoredRecord(
noLocator, noOaiExport,
reportingTopic, reportingTopic,
"Digital object has no locator" "record not defined for oai export"
) )
/*
reportIgnoredRecord( reportIgnoredRecord(
noDigitalObject, noDigitalObject,
reportingTopic, reportingTopic,
...@@ -88,6 +92,8 @@ class KafkaTopology extends Logging { ...@@ -88,6 +92,8 @@ class KafkaTopology extends Logging {
) )
*/
/* /*
......
...@@ -21,7 +21,10 @@ ...@@ -21,7 +21,10 @@
package ch.memobase.rico2edm package ch.memobase.rico2edm
import scala.util.Try import scala.util.Try
import ch.memobase.rico2edm.edm.Extractors import ch.memobase.rico2edm.edm.{EDM, Extractors}
import ch.memobase.rico2edm.utils.Helper
case class OaiExportRules (institutions: List[String], dataSets: List[String])
object KafkaTopologyUtils { object KafkaTopologyUtils {
...@@ -45,5 +48,18 @@ object KafkaTopologyUtils { ...@@ -45,5 +48,18 @@ object KafkaTopologyUtils {
.flatMap(dO => Try(Extractors.imageResourceId(dO).get)) .flatMap(dO => Try(Extractors.imageResourceId(dO).get))
.isFailure .isFailure
def notDefinedForOaiExport(msgVal: String): Boolean = {
val graph = Extractors.jsonGraph(msgVal).get
val record = Extractors.record(graph.arr).get
! (Helper.isPartOfSet(Extractors.recordSetOrInstitution(record)("isPartOf")
map { EDM.getInstitutionOrRecordsetIdent } ) ||
Helper.isPartOfInstitution(Extractors.recordSetOrInstitution(record)("heldBy")
map { EDM.getInstitutionOrRecordsetIdent } ))
}
} }
...@@ -48,7 +48,8 @@ object Main extends Logging { ...@@ -48,7 +48,8 @@ object Main extends Logging {
Keys.ELASTIC_PORT, Keys.ELASTIC_PORT,
Keys.INSTITUTION_INDEX, Keys.INSTITUTION_INDEX,
Keys.RECORDSET_INDEX, Keys.RECORDSET_INDEX,
Keys.ELASTIC_CLUSTERNAME Keys.ELASTIC_CLUSTERNAME,
Keys.EXPORT_DEFINITION_RULES,
).asJava, ).asJava,
"app.yml", "app.yml",
false, false,
...@@ -56,6 +57,8 @@ object Main extends Logging { ...@@ -56,6 +57,8 @@ object Main extends Logging {
false, false,
false false
) )
val streams = new KafkaStreams( val streams = new KafkaStreams(
topology.build( topology.build(
settings.getInputTopic, settings.getInputTopic,
...@@ -69,6 +72,9 @@ object Main extends Logging { ...@@ -69,6 +72,9 @@ object Main extends Logging {
val shutdownGracePeriodMs = 10000 val shutdownGracePeriodMs = 10000
Helper.initOaiExportRules(settings.getAppSettings)
Helper.initLanguageCodeMapping(settings.getAppSettings) Helper.initLanguageCodeMapping(settings.getAppSettings)
Helper.initInstitutionsCoordinateMapping(settings.getAppSettings) Helper.initInstitutionsCoordinateMapping(settings.getAppSettings)
ElasticSearchClientWrapper(settings.getAppSettings) ElasticSearchClientWrapper(settings.getAppSettings)
......
...@@ -21,10 +21,14 @@ ...@@ -21,10 +21,14 @@
package ch.memobase.rico2edm.utils package ch.memobase.rico2edm.utils
import ch.memobase.rico2edm.OaiExportRules
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.{Properties, HashMap => JHashMap} import java.util.{Properties, HashMap => JHashMap}
import scala.language.reflectiveCalls import scala.language.reflectiveCalls
import scala.util.{Failure, Success, Try}
import ujson.{Obj => JObj}
object Helper { object Helper {
...@@ -32,6 +36,9 @@ object Helper { ...@@ -32,6 +36,9 @@ object Helper {
private var institutionsCoordinates: Option[JHashMap[String,(String,String)]] = None private var institutionsCoordinates: Option[JHashMap[String,(String,String)]] = None
private var exportRules: Option[OaiExportRules] = None
def compress(data: Array[Byte]): Array[Byte] = { def compress(data: Array[Byte]): Array[Byte] = {
...@@ -54,6 +61,26 @@ object Helper { ...@@ -54,6 +61,26 @@ object Helper {
} }
def initOaiExportRules(props: Properties): Unit = {
using(io.Source.fromFile(props.get(Keys.EXPORT_DEFINITION_RULES).toString)) { source =>
val content = source.getLines.mkString("\n")
val json: JObj = Try { ujson.read(content) } match {
case Success(value) => value.obj
case Failure(exception) => throw new Exception ("didn't get any OAI export rules", exception)
}
val institutions: List[String] = json("isHeldBy").arr.toList.map( _.str)
val sets: List[String] = json("isPartOf").arr.toList.map( _.str)
exportRules = Some(OaiExportRules(institutions, sets))
}
}
def initLanguageCodeMapping(props: Properties):Unit = { def initLanguageCodeMapping(props: Properties):Unit = {
...@@ -101,6 +128,14 @@ object Helper { ...@@ -101,6 +128,14 @@ object Helper {
} }
} }
val isPartOfSet: List[String] => Boolean =
list => list.exists { item => {if (exportRules.isDefined) exportRules.get.dataSets.contains(item) else false} }
val isPartOfInstitution: List[String] => Boolean =
list => list.exists { item => {if (exportRules.isDefined) exportRules.get.institutions.contains(item) else false} }
def getInstitutionCoord(institutionId:String):Option[(String,String)] = { def getInstitutionCoord(institutionId:String):Option[(String,String)] = {
institutionsCoordinates match { institutionsCoordinates match {
case Some(coords) if coords.containsKey(institutionId) => case Some(coords) if coords.containsKey(institutionId) =>
......
...@@ -30,6 +30,8 @@ object Keys { ...@@ -30,6 +30,8 @@ object Keys {
val ELASTIC_PORT = "elastic.port" val ELASTIC_PORT = "elastic.port"
val ELASTIC_CLUSTERNAME = "elastic.clustername" val ELASTIC_CLUSTERNAME = "elastic.clustername"
val EXPORT_DEFINITION_RULES = "exportDefinitionRules"
val VIMEO_KEY = "VIMEO" val VIMEO_KEY = "VIMEO"
......
{
"isHeldBy": ["ati"],
"isPartOf": ["apf-001","apf-002"]
}
\ No newline at end of file
/*
* rico2edm
* Copyright (C) 2021 UB Basel
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
package ch.memobase.rico2edm.edm
import ch.memobase.Utils.loadFile
import ch.memobase.rico2edm.KafkaTopologyUtils
import ch.memobase.rico2edm.utils.{Helper, Keys}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import java.util.Properties
class GeneralSpec extends AnyFunSuite with Matchers{
private lazy val apf001 = loadFile("src/test/resources/apf-001-1280.json")
private lazy val noapf = loadFile("src/test/resources/rico.duration.json")
private lazy val definedInstitution = loadFile("src/test/resources/rawIndent.no.contributor.json")
test("is part of europeana export") {
val props = new Properties
props.put(Keys.EXPORT_DEFINITION_RULES,"src/test/resources/exportrules/rules.json")
Helper.initOaiExportRules(props)
assert(! KafkaTopologyUtils.notDefinedForOaiExport(apf001))
assert(KafkaTopologyUtils.notDefinedForOaiExport(noapf))
assert(! KafkaTopologyUtils.notDefinedForOaiExport(definedInstitution))
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment