Commit f8d2d96a authored by Jonas Waeber's avatar Jonas Waeber

Initial commit

parent 72dd3787
......@@ -35,7 +35,7 @@ dependencies {
// https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
//compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.1.0'
implementation 'org.memobase:memobase-service-utilities:1.2.1'
implementation 'org.memobase:memobase-service-utilities:1.4.0'
// Logging Framework
implementation "org.apache.logging.log4j:log4j-api:${log4jV}"
......@@ -50,6 +50,10 @@ dependencies {
// YAML Parser
implementation 'org.snakeyaml:snakeyaml-engine:2.1'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.+'
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.11.+"
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.11.+'
// JSON Parser
implementation 'com.beust:klaxon:5.2'
// Compression
......
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import kotlin.system.exitProcess
import org.apache.logging.log4j.LogManager
class App {
companion object {
private val log = LogManager.getLogger("App")
@JvmStatic fun main(args: Array<String>) {
try {
Service().run()
} catch (ex: Exception) {
ex.printStackTrace()
log.error("Stopping application due to error: " + ex.message)
exitProcess(1)
}
}
}
}
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
object DateFacetBuilder {
private const val separator = "~"
private const val terminator = "#"
private const val level_1 = "0"
private const val level_2 = "1"
private const val centuryName = "Jahrhundert"
fun buildFromNormalizedSingleDate(date: String): List<String> {
val century = getCentury(date.substring(0, 4))
val decade = getDecade(date.substring(0, 4))
return listOf(
"$level_1$separator$century$separator",
"$level_2$separator$century$separator$decade$terminator"
)
}
fun buildFromNormalizedDateRange(date: String): List<String> {
val from = date.substring(0, 4)
val until = if (date.contains("/")) {
val secondPart = date.split("/")[1]
if (Regex("\\d{4}.*").matches(secondPart)) {
secondPart.substring(0, 4)
} else {
""
}
} else {
""
}
return if (until.isEmpty()) {
val century = getCentury(from)
val decade = getDecade(from)
listOf(
"$level_1$separator$century$separator",
"$level_2$separator$century$separator$decade$terminator"
)
} else {
val fromCentury = getCentury(from)
val fromDecade = getDecade(from)
val untilCentury = getCentury(until)
val untilDecade = getDecade(until)
if (fromCentury == untilCentury && fromDecade == untilDecade) {
listOf(
"$level_1$separator$fromCentury$separator",
"$level_2$separator$fromCentury$separator$fromDecade$terminator"
)
} else if (fromCentury == untilCentury && fromDecade != untilDecade) {
val results = mutableListOf(
"$level_1$separator$fromCentury$separator",
"$level_2$separator$fromCentury$separator$fromDecade$terminator"
)
var fromDecadeAsInt = fromDecade.substring(0, 4).toInt()
val untilDecadeAsInt = untilDecade.substring(0, 4).toInt()
while (fromDecadeAsInt != untilDecadeAsInt) {
fromDecadeAsInt += 10
results.add(
"$level_2$separator$fromCentury$separator${getDecade(fromDecadeAsInt)}$terminator"
)
}
results
} else {
val results = mutableListOf(
"$level_1$separator$fromCentury$separator"
)
// this number is one higher than it should be
var fromCenturyAsInt = fromCentury.substring(0, 2).toInt()
val untilCenturyAsInt = untilCentury.substring(0, 2).toInt()
while (fromCenturyAsInt != untilCenturyAsInt) {
results.add(
"$level_1$separator${getCentury(fromCenturyAsInt)}$separator")
fromCenturyAsInt += 1
}
results.add(
"$level_2$separator$fromCentury$separator$fromDecade$terminator"
)
var fromDecadeAsInt = fromDecade.substring(0, 4).toInt()
val untilDecadeAsInt = untilDecade.substring(0, 4).toInt()
while (fromDecadeAsInt != untilDecadeAsInt) {
fromDecadeAsInt += 10
results.add(
"$level_2$separator${getCentury(fromDecadeAsInt / 100)}$separator${getDecade(fromDecadeAsInt)}$terminator"
)
}
results
}
}
}
private fun getCentury(year: String): String {
return getCentury(year.substring(0, 2).toInt())
}
private fun getCentury(century: Int): String {
return "${century + 1}.$centuryName"
}
private fun getDecade(year: String): String {
return getDecade(year.toInt())
}
private fun getDecade(year: Int): String {
val reminder = year % 10
val startYear = year - reminder
return "${startYear + 1}-${startYear + 10}"
}
}
/*
* mapper-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
class InvalidMappingException(message: String) : Exception(message)
/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import com.beust.klaxon.Klaxon
import java.io.StringReader
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class KafkaTopology(private val settings: SettingsLoader) {
private val log = LogManager.getLogger("StreamsProcessing")
private val searchDocTransform = SearchDocTransform(settings.appSettings.getProperty("mapping"))
fun build(): Topology {
val builder = StreamsBuilder()
val stream = builder.stream<String, String>(settings.inputTopic)
stream
.flatMapValues { value -> parseJson(value) }
.mapValues { value -> unpackJson(value) }
.mapValues { value -> transformJson(value) }
.mapValues { value -> value.toJsonString() }
.to(settings.outputTopic)
return builder.build()
}
private fun parseJson(data: String): List<JsonObject> {
val result = Klaxon().parseJsonObject(StringReader(data))
return if (result == null) emptyList()
else listOf(result)
}
private fun unpackJson(input: JsonObject): Map<String, JsonObject> {
val graph = input["@graph"] as JsonArray<JsonObject>
return graph.map {
if (it["@type"] == "rico:Record") {
Pair("record", it)
} else {
Pair(it["@id"] as String, it)
}
}.toMap()
}
private fun transformJson(input: Map<String, JsonObject>): JsonObject {
return searchDocTransform.transform(input)
}
}
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.JsonArray
import com.beust.klaxon.JsonObject
import com.beust.klaxon.json
import org.apache.logging.log4j.LogManager
class SearchDocTransform(mapping: String) {
private val log = LogManager.getLogger("SearchDocTransform")
private val transformConfig = YamlLoader(mapping).load()
fun transform(input: Map<String, JsonObject>): JsonObject {
return json {
obj(input["record"].let { record ->
record?.mapNotNull { entry ->
when (val transform = transformConfig.getTransform(entry.key)) {
is LiteralTransform ->
listOf(extractPair(entry.value, transform.target))
is EntityTransform ->
when (val value = entry.value) {
is String -> {
input[value]?.let { entity ->
entityTransformSelection(input, entity, transform)
}
}
is JsonObject -> {
input[value["@id"] as String]?.let { entity ->
entityTransformSelection(input, entity, transform)
}
}
is JsonArray<*> -> {
value.flatMap { children ->
children as JsonObject
input[children["@id"] as String]?.let { entity ->
entityTransformSelection(input, entity, transform)
}.orEmpty()
}
}
else -> emptyList()
}
null -> emptyList()
}
}?.flatten().orEmpty().filterNotNull().asIterable()
})
}
}
private fun entityTransformSelection(
input: Map<String, JsonObject>,
entity: JsonObject,
transform: EntityTransform
): List<Pair<String, Any>> {
return when {
transform.isDate -> {
extractDateEntity(entity, transform)
}
transform.conditionalTargets.isNotEmpty() -> {
transform.conditionalTargets.map {
if (entity.containsKey(it.conditionProperty) && entity[it.conditionProperty] == it.conditionPattern) {
val pair = extractPair(entity[transform.source], it.target)
if (pair != null) listOf(pair)
else emptyList()
} else {
emptyList()
}
}.flatten()
}
transform.isCreationRelation -> {
extractCreationRelationAndAgents(input, entity)
}
else -> {
val pair = extractPair(entity[transform.source], transform.target)
if (pair == null) emptyList()
else listOf(pair)
}
}
}
private fun extractPair(value: Any?, targetField: String): Pair<String, Any>? {
return when (value) {
is String ->
Pair(targetField, value)
is JsonObject ->
Pair(targetField, mapOf(Pair(value["@language"], value["@value"])))
is JsonArray<*> ->
Pair(targetField, value.map {
it as JsonObject
Pair(it["@language"], it["@value"])
}.toMap())
else -> {
log.error("Could not parse literal value: $value.")
null
}
}
}
private fun extractDateEntity(entity: JsonObject, transform: Transform): List<Pair<String, Any>> {
val isNormalized = entity.containsKey("rico:normalizedDateValue")
val date = if (isNormalized) {
entity["rico:normalizedDateValue"] as String
} else {
entity["rico:expressedDate"] as String
}
val qualifier = entity["rico:dateQualifier"] as String?
val certainty = entity["rico:certainty"] as String?
val result = mutableListOf<Pair<String, Any>>(
Pair(transform.target + ".date", date)
)
if (qualifier != null) {
result.add(
Pair(transform.target + ".qualifier", qualifier)
)
}
if (certainty != null) {
result.add(
Pair(transform.target + ".certainty", certainty)
)
}
val facetList = when (entity["@type"] as String) {
"rico:SingleDate" ->
if (isNormalized)
DateFacetBuilder.buildFromNormalizedSingleDate(date)
else emptyList()
"rico:DateRange" ->
if (isNormalized)
DateFacetBuilder.buildFromNormalizedDateRange(date)
else emptyList()
else -> emptyList()
}
if (facetList.isNotEmpty())
result.add(Pair(transform.target + ".facet", facetList))
return result
}
private fun extractCreationRelationAndAgents(
input: Map<String, JsonObject>,
creationRelation: JsonObject
): List<Pair<String, Any>> {
val type = creationRelation["rico:type"] as String
val name = creationRelation["rico:name"] as String
val target = (creationRelation["rico:creationRelationHasTarget"] as JsonObject)["@id"] as String
val agent = input[target]
return if (agent == null) {
emptyList()
} else {
val agentType = agent["@type"] as String
val agentName = agent["rico:name"] as Any
val pair1 =
extractPair(name, "${agentType.split(':')[1]}${type}Raw.relation")
val pair2 =
extractPair(agentName, "${agentType.split(':')[1]}${type}Raw.name")
val result = mutableListOf<Pair<String, Any>>()
if (pair1 != null) {
result.add(pair1)
}
if (pair2 != null) {
result.add(pair2)
}
result
}
}
}
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
data class SearchDocTransformConfig(val literals: List<LiteralTransform>, val entities: List<EntityTransform>) {
private fun getLiteralTransform(source: String): LiteralTransform? {
return literals.find { value -> value.source == source }
}
private fun getEntityTransform(source: String): EntityTransform? {
return entities.find { value -> value.property == source }
}
fun getTransform(source: String): Transform? {
return getLiteralTransform(source) ?: getEntityTransform(source)
}
}
/*
* Table Data Import Service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import org.apache.kafka.streams.KafkaStreams
import org.apache.logging.log4j.LogManager
import org.memobase.settings.SettingsLoader
class Service(file: String = "app.yml") {
private val log = LogManager.getLogger("SearchDocService")
val settings = SettingsLoader(
listOf(
"mapping"
),
file,
useStreamsConfig = true
)
val topology = KafkaTopology(settings).build()
private val stream = KafkaStreams(topology, settings.kafkaStreamsSettings)
fun run() {
stream.use {
it.start()
while (stream.state().isRunning) {
log.info("Service is running.")
Thread.sleep(10_000L)
}
}
}
}
/*
* search-doc-service
* Copyright (C) 2020 Memoriav
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
sealed class Transform(open val target: String) {
data class EntityTransform(
val property: String,
val source: String = "",
override val target: String = "",
val conditionalTargets: List<ConditionalTargets> = emptyList(),
val isCreationRelation: Boolean = false,
val isDate: Boolean = false
) : Transform(target)
data class LiteralTransform(
val source: String,
override val target: String,
val hasLanguageTags: Boolean = false
) : Transform(target)
}
data class ConditionalTargets(val target: String, val conditionProperty: String, val conditionPattern: String)
typealias EntityTransform = Transform.EntityTransform
typealias LiteralTransform = Transform.LiteralTransform