Commit 1b42a14b authored by Jonas Waeber's avatar Jonas Waeber
Browse files

[WIP] institution search doc implementation

parent ab38a2f1
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.JsonObject
import org.apache.logging.log4j.LogManager
import org.memobase.builders.*
import org.memobase.helpers.*
import org.memobase.model.EnrichedDigitalMetadata
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.Schema
import org.memobase.model.SearchDoc
import org.memobase.rdf.NS
class InstitutionSearchDocBuilder {
private val log = LogManager.getLogger("InstitutionSearchDocBuilder")
fun transform(input: Map<String, JsonObject>): Schema {
return InstitutionSearchDoc(
)
}
}
......@@ -25,9 +25,14 @@ import java.io.StringWriter
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Predicate
import org.apache.logging.log4j.LogManager
import org.memobase.helpers.Default
import org.memobase.helpers.JSON
import org.memobase.model.InstitutionSearchDoc
import org.memobase.model.Report
import org.memobase.model.Schema
import org.memobase.model.SearchDoc
import org.memobase.settings.SettingsLoader
......@@ -37,38 +42,61 @@ class KafkaTopology(private val settings: SettingsLoader) {
private val reportTopic = settings.processReportTopic
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty(KEYS.mediaUrlPropName))
private val institutionSearchDoc = InstitutionSearchDocBuilder()
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
val transformedStream = stream
val branchedStream = stream
.flatMapValues { value -> JSON.parse(value) }
.mapValues { value -> JSON.unpack(value) }
.branch(
Predicate { _, value -> value.containsKey(JSON.record) },
Predicate { _, value -> value.containsKey(JSON.institution) },
Predicate { _, value -> value.containsKey(JSON.recordSet) },
Predicate { _, _ -> true }
)
val recordStream = branchedStream[0]
.mapValues { readOnlyKey, value ->
try {
Pair(searchDocTransform.transform(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
} catch (ex: InvalidInputException) {
Pair(Default.searchDoc, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
}
outputStreams(recordStream)
val institutionStream = branchedStream[1]
.mapValues { readOnlyKey, value ->
try {
Pair(transformJson(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
Pair(institutionSearchDoc.transform(value), Report(readOnlyKey, "SUCCESS", "Transformed message into search doc."))
} catch (ex: InvalidInputException) {
Pair(null, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
Pair(Default.institutionSearchDoc, Report(readOnlyKey, "FAILURE", ex.localizedMessage))
}
}
transformedStream
outputStreams(institutionStream)
return builder.build()
}
private fun outputStreams(stream: KStream<String, Pair<Schema, Report>>) {
stream
.map { _, value -> KeyValue(value.second.id, value.second.toJson()) }
.to(reportTopic)
transformedStream
stream
.filterNot { _, value -> value.second.status == "FAILURE" }
.map { _, value -> KeyValue(value.first?.id, value.first) }
.map { _, value -> KeyValue(value.first.id, value.first) }
.mapValues { value ->
val writer = StringWriter()
ObjectMapper().registerKotlinModule().writeValue(writer, value)
writer.toString()
}
.to(settings.outputTopic)
return builder.build()
}
private fun transformJson(input: Map<String, JsonObject>): SearchDoc {
return searchDocTransform.transform(input)
}
}
......@@ -23,12 +23,13 @@ import org.apache.logging.log4j.LogManager
import org.memobase.builders.*
import org.memobase.helpers.*
import org.memobase.model.EnrichedDigitalMetadata
import org.memobase.model.Schema
import org.memobase.model.SearchDoc
import org.memobase.rdf.NS
class SearchDocTransform(private val mediaUrl: String) {
private val log = LogManager.getLogger("SearchDocTransform")
fun transform(input: Map<String, JsonObject>): SearchDoc {
fun transform(input: Map<String, JsonObject>): Schema {
val record = input["record"] ?: throw InvalidInputException("No record defined in the message.")
val digitalObject =
......@@ -116,8 +117,6 @@ class SearchDocTransform(private val mediaUrl: String) {
}
}
val recordIdentifiers = Filter.entitiesByProperty("identifiedBy", record, input)
val recordTitles = Filter.entitiesByProperty("hasTitle", record, input)
val recordLanguages = Filter.entitiesByProperty("hasLanguage", record, input)
......@@ -186,7 +185,7 @@ class SearchDocTransform(private val mediaUrl: String) {
},
sameAs = Extract.listOfStrings(record["sameAs"]),
abstract = Extract.languageContainer("abstract", record["abstract"]),
id = id,
recordId = id,
institution = Meta.extractInstitution(record),
recordSet = Meta.extractRecordSet(record),
descriptiveNote = Extract.languageContainer("descriptiveNote", record["descriptiveNote"]),
......
package org.memobase.helpers
import org.memobase.model.*
object Default {
val institutionSearchDoc = InstitutionSearchDoc(
"UnknownId",
false,
LanguageContainer(emptyList(), emptyList(), emptyList(), emptyList()),
LanguageContainer(emptyList(), emptyList(), emptyList(), emptyList()),
emptyList(),
emptyList(),
"",
0,
0
)
val searchDoc = SearchDoc(
emptyList(),
emptyList(),
emptyList(),
"",
"",
"",
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
FacettedContainer(LanguageContainer(emptyList(), emptyList(), emptyList(), emptyList()),"", emptyList()),
false,
emptyList(),
emptyList(),
"",
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
EnrichedDigitalMetadata(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
emptyList(),
false,
SuggestContainer(emptyList(), emptyList(), emptyList(), emptyList())
)
}
\ No newline at end of file
......@@ -8,20 +8,36 @@ import java.io.StringReader
object JSON {
private const val graph = "@graph"
private const val atId = "@id"
const val atType = "@type" // rdf:type
const val type = "type" // rico:type
private const val Record = "Record"
private const val RecordSet = "RecordSet"
private const val CorporateBody = "CorporateBody"
private const val memobaseInstitution = "memobaseInstitution"
const val institution = "institution"
const val record = "record"
const val recordSet = "recordSet"
private val klaxon = Klaxon()
fun parse(data: String): List<JsonObject> {
val result = Klaxon().parseJsonObject(StringReader(data))
val result = klaxon.parseJsonObject(StringReader(data))
return listOf(result)
}
fun unpack(input: JsonObject): Map<String, JsonObject> {
val graph = input["@graph"] as JsonArray<JsonObject>
val graph = input[graph] as JsonArray<JsonObject>
return graph.map {
if (it["@type"] == NS.rico + "Record") {
Pair("record", it)
if (it[atType] == NS.rico + Record) {
Pair(record, it)
} else if (it[atType] == NS.rico + RecordSet) {
Pair(record, it)
} else if (it[atType] == NS.rico + CorporateBody && it[type] == memobaseInstitution) {
Pair(institution, it)
} else {
Pair(it["@id"] as String, it)
Pair(it[atId] as String, it)
}
}.toMap()
}
......
package org.memobase.model
import com.fasterxml.jackson.annotation.JsonInclude
@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class InstitutionSearchDoc(
val institutionId: String,
val published: Boolean,
// Full Text Search
val name: LanguageContainer,
// Facettes
val canton: LanguageContainer,
val type: List<LanguageContainer>,
val documentType: List<LanguageContainer>,
// Search Result Display
val keyVisualLink: String,
val numberOfRecordSets: Int,
val numberOfDocuments: Int
) : Schema(institutionId)
\ No newline at end of file
package org.memobase.model
import com.fasterxml.jackson.annotation.JsonInclude
@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class RecordSet(
val recordSetIdentifier: String,
val isSupported: Boolean,
val documentType: List<LanguageContainer>,
val timePeriod: TimePeriod,
val keyVisualLink: String,
val institution: LanguageContainer,
val institutionId: String,
val languages: List<LanguageContainer>,
val description: LanguageContainer,
val numberOfDocuments: Int,
val latestUpdate: String
) : Schema(recordSetIdentifier)
package org.memobase.model
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.StringWriter
@JsonInclude(JsonInclude.Include.NON_EMPTY)
open class Schema(val id: String) {
private val writer = ObjectMapper().registerKotlinModule().writer()
fun toJson(): String {
val stringWriter = StringWriter()
writer.writeValue(stringWriter, this)
return writer.toString()
}
}
\ No newline at end of file
......@@ -30,7 +30,7 @@ data class SearchDoc(
// Datatype properties
val type: String,
val sourceID: String,
val id: String,
val recordId: String,
val abstract: List<LanguageContainer>,
val descriptiveNote: List<LanguageContainer>,
val scopeAndContent: List<LanguageContainer>,
......@@ -113,4 +113,4 @@ data class SearchDoc(
// auto complete source
val suggest: SuggestContainer
)
) : Schema(recordId)
package org.memobase.model
import com.fasterxml.jackson.annotation.JsonInclude
@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class TimePeriod(
val start: String,
val end: String
)
\ No newline at end of file
......@@ -17,19 +17,21 @@
*/
package org.memobase
import java.io.File
import java.nio.charset.Charset
import java.util.stream.Stream
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.logging.log4j.LogManager
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.memobase.params.TestParam
import java.io.File
import java.nio.charset.Charset
import java.util.stream.Stream
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class IntegrationTest {
......@@ -84,4 +86,44 @@ class IntegrationTest {
"key"
)
)
@Test
fun `integration test institution`() {
val service = Service("kafkaTest1.yml")
val testDriver = TopologyTestDriver(service.topology, service.settings.kafkaStreamsSettings)
val factory = ConsumerRecordFactory(
StringSerializer(), StringSerializer()
)
testDriver.pipeInput(
factory.create(
service.settings.inputTopic, "https://memobase.ch/institution/RadioX", readFile("institution/input.json")
)
)
val record = testDriver.readOutput(
service.settings.outputTopic,
StringDeserializer(),
StringDeserializer()
)
val report = testDriver.readOutput(
service.settings.processReportTopic,
StringDeserializer(),
StringDeserializer()
)
val reportKey = report.key()
val reportValue = report.value()
val value = record.value()
val key = record.key()
assertAll("",
{
assertThat(value)
.isEqualTo(readFile("institution/output.json"))
},
{ assertThat(key).isEqualTo("https://memobase.ch/institution/RadioX") }
)
}
}
{
"@graph": [
{
"@id": "https://memobase.ch/institution/RadioX",
"@type": "https://www.ica.org/standards/RiC/ontology#CorporateBody",
"eventType": "Create",
"P2699": "http://online-archiv-radiox.com",
"P31": [
"http://www.wikidata.org/entity/Q327333",
"http://www.wikidata.org/entity/Q166118"
],
"P856": "https://radiox.ch/",
"P968": "radiox@test.com",
"descriptiveNote": [
{
"@language": "de",
"@value": "<p>Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem.</p>"
},
{
"@language": "it",
"@value": "<p>test_description&nbsp;&nbsp;(IT)</p>"
},
{
"@language": "fr",
"@value": "<p>test_description&nbsp;(FR)</p>"
}
],
"hasLocation": [
"https://memobase.ch/institution/RadioX#genid3e44aabe-29bf-45cc-ae00-372817c7b84d",
"https://memobase.ch/institution/RadioX#genidb4d72c27-a269-4ef6-8693-c24661be214f"
],
"identifiedBy": [
"https://memobase.ch/institution/RadioX#genid87a391fc-a46f-4811-a5b5-7cae8447d929",
"https://memobase.ch/institution/RadioX#genid87dcf55a-a587-46b2-a69a-f85e638ff94f"
],
"name": [
{
"@language": "de",
"@value": "RadioX"
},
{
"@language": "fr",
"@value": "Test institution Oana (FR)"
},
{
"@language": "it",
"@value": "Test institution Oana (IT)"
}
],
"type": "memobaseInstitution"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid3287b9c5-4efe-499d-9384-2a1059ae9ddf",
"@type": "https://www.ica.org/standards/RiC/ontology#Place",
"sameAs": "http://www.wikidata.org/entity/Q69030",
"name": [
{
"@language": "it",
"@value": "Münchenstein"
},
{
"@language": "fr",
"@value": "Münchenstein"
},
{
"@language": "de",
"@value": "Münchenstein"
}
],
"type": "municipality"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid3e44aabe-29bf-45cc-ae00-372817c7b84d",
"@type": "https://www.ica.org/standards/RiC/ontology#Place",
"P131": [
"https://memobase.ch/institution/RadioX#genid74e3ce4c-42fb-4af6-8380-a2fd324c5342",
"https://memobase.ch/institution/RadioX#genid40fd3abd-77ae-47e9-9c07-c3e894db5210"
],
"P17": "http://www.wikidata.org/entity/Q39",
"P281": "3005",
"P625": "46.9531243, 7.434256",
"P6375": "Helvetiapl. 5",
"P669": "Helvetiapl.",
"P670": "5"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid40fd3abd-77ae-47e9-9c07-c3e894db5210",
"@type": "https://www.ica.org/standards/RiC/ontology#Place",
"sameAs": "http://www.wikidata.org/entity/Q11911",
"name": [
{
"@language": "it",
"@value": "Berne"
},
{
"@language": "fr",
"@value": "Berne"
},
{
"@language": "de",
"@value": "Bern"
}
],
"type": "canton"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid74e3ce4c-42fb-4af6-8380-a2fd324c5342",
"@type": "https://www.ica.org/standards/RiC/ontology#Place",
"name": "Bern",
"type": "municipality"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid7558b5df-79a2-49bd-987d-a7ccb938a232",
"@type": "https://www.ica.org/standards/RiC/ontology#Place",
"sameAs": "http://www.wikidata.org/entity/Q12172",
"name": [
{
"@language": "it",
"@value": "Bâle-Ville"
},
{
"@language": "fr",
"@value": "Bâle-Ville"
},
{
"@language": "de",
"@value": "Basel-Stadt"
}
],
"type": "canton"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid87a391fc-a46f-4811-a5b5-7cae8447d929",
"@type": "https://www.ica.org/standards/RiC/ontology#Identifier",
"identifier": "RadioX",
"type": "main"
},
{
"@id": "https://memobase.ch/institution/RadioX#genid87dcf55a-a587-46b2-a69a-f85e638ff94f",
"@type": "https://www.ica.org/standards/RiC/ontology#Identifier",
"identifier": "test_radio_x",
"type": "oldMemobase"
},
{
"@id": "https://memobase.ch/institution/RadioX#genidb4d72c27-a269-4ef6-8693-c24661be214f",
"@type": "https://www.ica.org/standards/RiC/ontology#Place",