Commit cd752478 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add record sets search doc generation.

parent ad271354
...@@ -64,6 +64,7 @@ dependencies { ...@@ -64,6 +64,7 @@ dependencies {
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils // https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils
testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV testCompile group: 'org.apache.kafka', name: 'kafka-streams-test-utils', version: kafkaV
testImplementation "io.mockk:mockk:1.10.0"
} }
compileKotlin { compileKotlin {
......
...@@ -30,8 +30,13 @@ spec: ...@@ -30,8 +30,13 @@ spec:
volumeMounts: volumeMounts:
- name: instituion-type-labels - name: instituion-type-labels
mountPath: "/configs/institution_types/" mountPath: "/configs/institution_types/"
- name: document-type-labels
mountPath: "/configs/document_types/"
volumes: volumes:
- name: instituion-type-labels - name: instituion-type-labels
configMap: configMap:
name: "{{ .Values.instutionTypeLabels }}" name: "{{ .Values.instutionTypeLabels }}"
- name: document-type-labels
configMap:
name: "{{ .Values.documentTypeLabels }}"
restartPolicy: Always restartPolicy: Always
...@@ -13,5 +13,6 @@ inputTopic: search-doc-input-documents ...@@ -13,5 +13,6 @@ inputTopic: search-doc-input-documents
reportingTopic: postprocessing-reporting reportingTopic: postprocessing-reporting
instutionTypeLabels: institution-type-labels instutionTypeLabels: institution-type-labels
documentTypeLabels: document-type-labels
mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/ mediaServerUrl: https://media.memobase.k8s.unibas.ch/memo/
\ No newline at end of file
...@@ -29,10 +29,12 @@ import org.apache.kafka.streams.Topology ...@@ -29,10 +29,12 @@ import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Default import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.JSON import org.memobase.helpers.JSON
import org.memobase.helpers.KEYS import org.memobase.helpers.KEYS
import org.memobase.model.DocumentsSearchDoc import org.memobase.model.DocumentsSearchDoc
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.RecordSetSearchDoc
import org.memobase.model.Schema import org.memobase.model.Schema
class KafkaTopology(private val settings: SettingsLoader) { class KafkaTopology(private val settings: SettingsLoader) {
...@@ -45,6 +47,10 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -45,6 +47,10 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val institutionSearchDoc = private val institutionSearchDoc =
InstitutionSearchDocBuilder(appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath), appSettings) InstitutionSearchDocBuilder(appSettings.getProperty(KEYS.SettingsProps.institutionTypeLabelsPath), appSettings)
private val elasticSearchWrapper = ElasticSearchWrapper(settings.appSettings)
private val recordSetSearchDocBuilder =
RecordSetSearchDocBuilder(elasticSearchWrapper)
private val jsonWriter = ObjectMapper().registerKotlinModule().writer() private val jsonWriter = ObjectMapper().registerKotlinModule().writer()
fun build(): Topology { fun build(): Topology {
...@@ -83,7 +89,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -83,7 +89,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name) Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
) )
} catch (ex: InvalidInputException) { } catch (ex: InvalidInputException) {
Pair(Default.institutionSearchDoc, Report( Pair(InstitutionSearchDoc.DEFAULT, Report(
readOnlyKey, readOnlyKey,
ReportStatus.warning, ReportStatus.warning,
ex.localizedMessage, ex.localizedMessage,
...@@ -92,6 +98,23 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -92,6 +98,23 @@ class KafkaTopology(private val settings: SettingsLoader) {
} }
outputStreams(institutionStream) outputStreams(institutionStream)
val recordSetStream = branchedStream[2]
.mapValues { readOnlyKey, value ->
try {
Pair(
recordSetSearchDocBuilder.transform(readOnlyKey, value),
Report(readOnlyKey, ReportStatus.success, "Transformed message into search doc.", Service.name)
)
} catch (ex: InvalidInputException) {
Pair(RecordSetSearchDoc.DEFAULT, Report(
readOnlyKey,
ReportStatus.warning,
ex.localizedMessage,
Service.name))
}
}
outputStreams(recordSetStream)
branchedStream[3] branchedStream[3]
.mapValues { readOnlyKey, value -> .mapValues { readOnlyKey, value ->
Report( Report(
...@@ -111,7 +134,7 @@ class KafkaTopology(private val settings: SettingsLoader) { ...@@ -111,7 +134,7 @@ class KafkaTopology(private val settings: SettingsLoader) {
.to(reportTopic) .to(reportTopic)
stream stream
.filterNot { _, value -> value.second.status == "FAILURE" } .filterNot { _, value -> value.second.status == ReportStatus.fatal }
.mapValues { value -> value.first } .mapValues { value -> value.first }
.mapValues { value -> .mapValues { value ->
val out = StringWriter() val out = StringWriter()
......
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Date
import org.memobase.helpers.ElasticSearchWrapper
import org.memobase.helpers.Extract
import org.memobase.helpers.KEYS
import org.memobase.model.FacetContainer
import org.memobase.model.LanguageContainer
import org.memobase.model.RecordSetSearchDoc
import org.memobase.model.Schema
import org.memobase.model.IntegerRange
class RecordSetSearchDocBuilder(private val elasticSearchWrapper: ElasticSearchWrapper) {
private val log = LogManager.getLogger("RecordSetSearchDocBuilder")
fun transform(key: String, input: Map<String, JsonObject>): Schema {
val recordSet =
input["recordSet"] ?: throw InvalidInputException("No recordSet entity found in message $key.")
val identifiers = mutableListOf<JsonObject>()
input.values.forEach {
when {
it[KEYS.ricoType] == KEYS.IdentifierType.main -> {
identifiers.add(it)
}
}
}
val name = extractLanguageContainer(recordSet[KEYS.title], "NoNameFound")
val description = extractLanguageContainer(recordSet[KEYS.descriptiveNote], "NoDescriptionFound")
val id = Extract.extractIdValue(identifiers, KEYS.IdentifierType.main) ?: "NoIdentifierFound"
val institution = recordSet[KEYS.heldBy] as String?
if (institution != null) {
// TODO:
}
return RecordSetSearchDoc(
recordSetId = id,
isPublished = recordSet[KEYS.isPublished].let {
when (it) {
is String -> it.toBoolean()
else -> {
log.error("Found no isPublished property on record set $key. Set to false.")
false
}
}
},
scopeAndContent = description,
periodOfTimeAsYear = IntegerRange(1920, 2020),
institution = FacetContainer(LanguageContainer.placeholder("NoNameInstitution"), filter = institution, facet = emptyList()),
supportedByMemoriav = recordSet[KEYS.sponsoredBy] != null,
name = name,
documentType = elasticSearchWrapper.getDocumentTypesFromRecords(id, KEYS.QueryFields.recordSetFacet),
keyVisualLink = recordSet[KEYS.wikidataImage].let { if (it != null) it as String else "NoKeyVisualLinkDefined" },
numberOfDocuments = elasticSearchWrapper.countNumberOfDocuments(id),
lastUpdatedDate = Date.now,
languageOfMetadata = FacetContainer(LanguageContainer.placeholder("Deutsch"), filter = null, facet = emptyList())
)
}
private fun extractLanguageContainer(value: Any?, placeholder: String): LanguageContainer {
return Extract.languageContainer("record set", value).let { items ->
when {
items.isEmpty() -> {
LanguageContainer.placeholder(placeholder)
}
items.size == 1 -> {
items[0]
}
else -> {
items.reduce { acc, languageContainer -> acc.merge(languageContainer) }
}
}
}
}
}
...@@ -33,6 +33,7 @@ class Service(file: String = "app.yml") { ...@@ -33,6 +33,7 @@ class Service(file: String = "app.yml") {
val settings = SettingsLoader( val settings = SettingsLoader(
listOf( listOf(
KEYS.SettingsProps.institutionTypeLabelsPath, KEYS.SettingsProps.institutionTypeLabelsPath,
KEYS.SettingsProps.documentTypeLabelsPath,
KEYS.SettingsProps.mediaUrl, KEYS.SettingsProps.mediaUrl,
KEYS.SettingsProps.elasticHost, KEYS.SettingsProps.elasticHost,
KEYS.SettingsProps.elasticPort, KEYS.SettingsProps.elasticPort,
......
...@@ -4,5 +4,5 @@ import java.time.LocalDateTime ...@@ -4,5 +4,5 @@ import java.time.LocalDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
object Date { object Date {
val now = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) val now: String = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)
} }
\ No newline at end of file
package org.memobase.helpers
import org.memobase.model.*
object Default {
val institutionSearchDoc = InstitutionSearchDoc(
"UnknownId",
false,
LanguageContainer.EMPTY,
LanguageContainer.EMPTY,
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
"",
"",
0,
0,
"",
""
)
}
\ No newline at end of file
package org.memobase.helpers
import org.memobase.model.FacetContainer
class DocumentTypeMapper(path: String) {
private val labels = LoadFile.readLabelFile(path)
fun getValue(code: String): FacetContainer {
return labels[code] ?: FacetContainer.DEFAULT
}
}
\ No newline at end of file
package org.memobase.helpers package org.memobase.helpers
import java.util.Properties
import org.apache.http.HttpHost import org.apache.http.HttpHost
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException import org.elasticsearch.ElasticsearchException
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest
import org.elasticsearch.action.search.ClearScrollRequest
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchScrollRequest
import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.CountRequest import org.elasticsearch.client.core.CountRequest
import org.elasticsearch.client.indices.GetIndexRequest import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.query.QueryBuilders.termQuery
import org.elasticsearch.search.Scroll
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.memobase.model.FacetContainer
import java.net.SocketTimeoutException
import java.util.*
import org.elasticsearch.search.sort.SortBuilders
/** /**
* This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve * This class facilitates a connection to the Elasticsearch cluster and offers convenience functions to retrieve
...@@ -21,7 +32,8 @@ class ElasticSearchWrapper(settings: Properties) { ...@@ -21,7 +32,8 @@ class ElasticSearchWrapper(settings: Properties) {
private val host = settings.getProperty(KEYS.SettingsProps.elasticHost) private val host = settings.getProperty(KEYS.SettingsProps.elasticHost)
private val port = settings.getProperty(KEYS.SettingsProps.elasticPort).toInt() private val port = settings.getProperty(KEYS.SettingsProps.elasticPort).toInt()
private val documentsIndex = settings.getProperty(KEYS.SettingsProps.elasticIndex) private val documentsIndex = settings.getProperty(KEYS.SettingsProps.elasticIndex)
private val client = connect() private val documentTypeMapper = DocumentTypeMapper(settings.getProperty(KEYS.SettingsProps.documentTypeLabelsPath))
private var client: RestHighLevelClient? = null
/** /**
* Establishes a connection to the client and ensures, that the index is present. The index may be an alias * Establishes a connection to the client and ensures, that the index is present. The index may be an alias
...@@ -38,16 +50,17 @@ class ElasticSearchWrapper(settings: Properties) { ...@@ -38,16 +50,17 @@ class ElasticSearchWrapper(settings: Properties) {
* *
* TODO: In the future find a better solution. * TODO: In the future find a better solution.
*/ */
private fun connect(): RestHighLevelClient? { fun connect(): RestHighLevelClient? {
return try { return try {
val c = RestHighLevelClient( val c = RestHighLevelClient(
RestClient.builder( RestClient.builder(
HttpHost(host, port) HttpHost(host, port)
)) )
)
val indexExists = c.indices().exists(GetIndexRequest(documentsIndex), RequestOptions.DEFAULT)
val aliasExists = c.indices().existsAlias(GetAliasesRequest(documentsIndex), RequestOptions.DEFAULT)
if (!c.indices().exists(GetIndexRequest(documentsIndex), RequestOptions.DEFAULT) || if (!indexExists && !aliasExists) {
!c.indices().existsAlias(GetAliasesRequest(documentsIndex), RequestOptions.DEFAULT)) {
log.error("Could not find the index or alias defined in the configuration: $documentsIndex.") log.error("Could not find the index or alias defined in the configuration: $documentsIndex.")
null null
} else { } else {
...@@ -57,6 +70,9 @@ class ElasticSearchWrapper(settings: Properties) { ...@@ -57,6 +70,9 @@ class ElasticSearchWrapper(settings: Properties) {
} catch (ex: ElasticsearchException) { } catch (ex: ElasticsearchException) {
log.error(ex.detailedMessage) log.error(ex.detailedMessage)
null null
} catch (ex: SocketTimeoutException) {
log.error(ex.localizedMessage)
null
} }
} }
...@@ -64,6 +80,9 @@ class ElasticSearchWrapper(settings: Properties) { ...@@ -64,6 +80,9 @@ class ElasticSearchWrapper(settings: Properties) {
* Counts the number of documents attached to a specific record set. * Counts the number of documents attached to a specific record set.
*/ */
fun countNumberOfDocuments(recordSetIdentifier: String): Int { fun countNumberOfDocuments(recordSetIdentifier: String): Int {
if (client == null)
connect()
return client.let { return client.let {
if (it == null) { if (it == null) {
log.error("Not connected to an index. Count is at zero! Restart service to retry connection.") log.error("Not connected to an index. Count is at zero! Restart service to retry connection.")
...@@ -71,12 +90,13 @@ class ElasticSearchWrapper(settings: Properties) { ...@@ -71,12 +90,13 @@ class ElasticSearchWrapper(settings: Properties) {
} else { } else {
val request = CountRequest(documentsIndex) val request = CountRequest(documentsIndex)
request.query( request.query(
QueryBuilders.termQuery( termQuery(
"recordSet.facet", recordSetIdentifier "recordSet.facet", recordSetIdentifier
) )
) )
try { try {
val response = it.count(request, RequestOptions.DEFAULT val response = it.count(
request, RequestOptions.DEFAULT
) )
response.count.toInt() response.count.toInt()
} catch (ex: ElasticsearchException) { } catch (ex: ElasticsearchException) {
...@@ -86,4 +106,69 @@ class ElasticSearchWrapper(settings: Properties) { ...@@ -86,4 +106,69 @@ class ElasticSearchWrapper(settings: Properties) {
} }
} }
} }
/**
* Counts the number of documents attached to a specific record set.
*/
fun getDocumentTypesFromRecords(recordSetIdentifier: String, queryField: String): List<FacetContainer> {
if (client == null)
client = connect()
return client.let {
if (it == null) {
log.error("Could not connect to elasticsearch. Try again.")
emptyList()
} else {
try {
val resultFacets = mutableListOf<FacetContainer>()
val typeSet = mutableSetOf<String>()
val scroll = Scroll(TimeValue.timeValueMinutes(1L))
val searchRequest = SearchRequest(documentsIndex)
searchRequest.scroll(scroll)
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.fetchSource(
arrayOf(
"id", "type"
), emptyArray<String>()
)
searchSourceBuilder.query(
termQuery(
queryField, recordSetIdentifier
)
)
searchRequest.source(searchSourceBuilder)
var searchResponse = it.search(searchRequest, RequestOptions.DEFAULT)
var scrollId = searchResponse.scrollId
var searchHits = searchResponse.hits.hits
while (searchHits != null && searchHits.isNotEmpty()) {
val scrollRequest = SearchScrollRequest(scrollId)
scrollRequest.scroll(scroll)
searchResponse = it.scroll(scrollRequest, RequestOptions.DEFAULT)
scrollId = searchResponse.scrollId
searchHits = searchResponse.hits.hits
for (hit in searchHits) {
val type = hit.sourceAsMap["type"]
if (type != null) {
type as String
if (!typeSet.contains(type)) {
resultFacets.add(documentTypeMapper.getValue(type))
typeSet.add(type)
}
}
}
}
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.addScrollId(scrollId)
it.clearScroll(clearScrollRequest, RequestOptions.DEFAULT)
resultFacets
} catch (ex: ElasticsearchException) {
log.error(ex.detailedMessage)
emptyList<FacetContainer>()
}
}
}
}
} }
\ No newline at end of file
...@@ -41,7 +41,7 @@ object JSON { ...@@ -41,7 +41,7 @@ object JSON {
if (it[atType] == NS.rico + Record) { if (it[atType] == NS.rico + Record) {
Pair(record, it) Pair(record, it)
} else if (it[atType] == NS.rico + RecordSet) { } else if (it[atType] == NS.rico + RecordSet) {
Pair(record, it) Pair(recordSet, it)
} else if (it[atType] == NS.rico + CorporateBody && it[type] == memobaseInstitution) { } else if (it[atType] == NS.rico + CorporateBody && it[type] == memobaseInstitution) {
Pair(institution, it) Pair(institution, it)
} else { } else {
......
...@@ -20,6 +20,7 @@ package org.memobase.helpers ...@@ -20,6 +20,7 @@ package org.memobase.helpers
object KEYS { object KEYS {
object SettingsProps { object SettingsProps {
const val documentTypeLabelsPath = "documentTypeLabelsPath"
const val mediaUrl = "media.url" const val mediaUrl = "media.url"
const val institutionTypeLabelsPath = "institutionTypeLabelsPath" const val institutionTypeLabelsPath = "institutionTypeLabelsPath"
const val elasticHost = "elastic.host" const val elasticHost = "elastic.host"
...@@ -70,6 +71,7 @@ object KEYS { ...@@ -70,6 +71,7 @@ object KEYS {
// namespace rda: // namespace rda:
const val placeOfCapture = "P60556" const val placeOfCapture = "P60556"
const val producer = "P60441" const val producer = "P60441"
const val sponsoredBy = "P60451"
// namespace skos: // namespace skos:
const val prefLabel = "prefLabel" const val prefLabel = "prefLabel"
...@@ -100,6 +102,11 @@ object KEYS { ...@@ -100,6 +102,11 @@ object KEYS {
const val missingLabelIt = "GALATEO MANCANTE" const val missingLabelIt = "GALATEO MANCANTE"
const val missingLabelEn = "MISSING LABEL" const val missingLabelEn = "MISSING LABEL"
object QueryFields {
const val recordSetFacet = "recordSet.facet"
const val institutionFacet = "institution.facet"
}
object TitleTypes { object TitleTypes {
const val main = "main" const val main = "main"
const val series = "series" const val series = "series"
......
...@@ -2,6 +2,7 @@ package org.memobase.model ...@@ -2,6 +2,7 @@ package org.memobase.model
import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonInclude
import org.memobase.helpers.Date
@JsonInclude(JsonInclude.Include.NON_EMPTY) @JsonInclude(JsonInclude.Include.NON_EMPTY)
data class InstitutionSearchDoc( data class InstitutionSearchDoc(
...@@ -30,4 +31,26 @@ data class InstitutionSearchDoc( ...@@ -30,4 +31,26 @@ data class InstitutionSearchDoc(
val numberOfDocuments: Int, val numberOfDocuments: Int,
val teaserColor: String, val teaserColor: String,
val teaserColorComputed: String val teaserColorComputed: String
) : Schema(institutionId) ) : Schema(institutionId) {
\ No newline at end of file companion object {
val DEFAULT = InstitutionSearchDoc(
"DefaultInsitutionId",
false,
LanguageContainer.EMPTY,
LanguageContainer.EMPTY,
emptyList(),
emptyList(),
emptyList(),
emptyList(),