Commit 1cef20e7 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Switch json parser. Klaxon does not properly parse if type is any but there is something to parse.

parent bf30b185
Pipeline #24591 passed with stages
in 4 minutes and 39 seconds
...@@ -41,7 +41,10 @@ dependencies { ...@@ -41,7 +41,10 @@ dependencies {
implementation 'org.memobase:memobase-service-utilities:2.0.14' implementation 'org.memobase:memobase-service-utilities:2.0.14'
// JSON Parser // JSON Parser
implementation 'com.beust:klaxon:5.2' implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.+'
implementation 'com.fasterxml.jackson.core:jackson-core:2.11.+'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.+'
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.11.+"
// Kafka Imports // Kafka Imports
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaV
......
...@@ -20,11 +20,12 @@ package org.memobase ...@@ -20,11 +20,12 @@ package org.memobase
import ch.memobase.reporting.Report import ch.memobase.reporting.Report
import ch.memobase.reporting.ReportStatus import ch.memobase.reporting.ReportStatus
import com.beust.klaxon.Klaxon import com.fasterxml.jackson.core.JsonProcessingException
import com.beust.klaxon.KlaxonException import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.bulk.BulkProcessor import org.elasticsearch.action.bulk.BulkProcessor
import org.elasticsearch.action.bulk.BulkRequest import org.elasticsearch.action.bulk.BulkRequest
...@@ -35,7 +36,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit ...@@ -35,7 +36,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.ByteSizeValue import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.reindex.BulkByScrollResponse
import org.elasticsearch.index.reindex.UpdateByQueryRequest import org.elasticsearch.index.reindex.UpdateByQueryRequest
import org.elasticsearch.script.Script import org.elasticsearch.script.Script
import org.elasticsearch.script.ScriptType.INLINE import org.elasticsearch.script.ScriptType.INLINE
...@@ -49,7 +49,7 @@ class BulkProcessor( ...@@ -49,7 +49,7 @@ class BulkProcessor(
private val log: Logger private val log: Logger
) { ) {
private var recordsInBulk = 0 private var recordsInBulk = 0
private val klaxon = Klaxon() private val mapper = ObjectMapper().registerKotlinModule()
private val updateByQueryListener = UpdateByQueryListener(serviceName, producer) private val updateByQueryListener = UpdateByQueryListener(serviceName, producer)
private val listener = object : BulkProcessor.Listener { private val listener = object : BulkProcessor.Listener {
...@@ -176,7 +176,7 @@ class BulkProcessor( ...@@ -176,7 +176,7 @@ class BulkProcessor(
records.forEach { records.forEach {
val report = if (it.key().contains("#update")) { val report = if (it.key().contains("#update")) {
log.info("Processing query request for ${it.key()} with message ${it.value()}.") log.info("Processing query request for ${it.key()} with message ${it.value()}.")
addUpdateByQueryRequest(it) executeUpdateRequest(it)
} else { } else {
log.info("Adding consumer record ${it.key()} to bulk processor.") log.info("Adding consumer record ${it.key()} to bulk processor.")
add(it) add(it)
...@@ -217,9 +217,9 @@ class BulkProcessor( ...@@ -217,9 +217,9 @@ class BulkProcessor(
} }
} }
private fun addUpdateByQueryRequest(consumerRecord: ConsumerRecord<String, String>): Report { private fun executeUpdateRequest(consumerRecord: ConsumerRecord<String, String>): Report {
val updateQuery = try { val updateQuery = try {
klaxon.parse<UpdateQuery>(consumerRecord.value()).let { mapper.readValue<UpdateQuery>(consumerRecord.value(), UpdateQuery::class.java).let {
it it
?: return Report( ?: return Report(
consumerRecord.key(), consumerRecord.key(),
...@@ -228,7 +228,16 @@ class BulkProcessor( ...@@ -228,7 +228,16 @@ class BulkProcessor(
serviceName serviceName
) )
} }
} catch (ex: KlaxonException) { } catch (ex: JsonProcessingException) {
log.error("JSON Parse Error: ${ex.localizedMessage}")
return Report(
consumerRecord.key(),
ReportStatus.fatal,
"JSON Parse Error: ${ex.localizedMessage}",
serviceName
)
} catch (ex: JsonMappingException) {
log.error("JSON Parse Error: ${ex.localizedMessage}")
return Report( return Report(
consumerRecord.key(), consumerRecord.key(),
ReportStatus.fatal, ReportStatus.fatal,
...@@ -252,6 +261,7 @@ class BulkProcessor( ...@@ -252,6 +261,7 @@ class BulkProcessor(
serviceName serviceName
) )
} catch (ex: Exception) { } catch (ex: Exception) {
log.error("Unknown exception: ${ex.localizedMessage}")
return Report( return Report(
consumerRecord.key(), consumerRecord.key(),
ReportStatus.fatal, ReportStatus.fatal,
......
/*
* elasticsearch consumer service
* Copyright (C) 2020-2021 Jonas Waeber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.memobase
import com.beust.klaxon.Klaxon
data class UpdateMessage(
val term: String,
val value: String,
val doc: String
) {
companion object {
private val klaxon = Klaxon()
fun fromJson(data: String): UpdateMessage? {
return klaxon.parse<UpdateMessage>(data)
}
}
}
\ No newline at end of file
package ch.memobase
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.reindex.UpdateByQueryRequest
import org.elasticsearch.script.Script
import org.elasticsearch.script.ScriptType
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.memobase.UpdateQuery
import java.io.File
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestUpdateQueries {
@Test
fun `test update query request creation`() {
val mapper = ObjectMapper().registerKotlinModule()
val updateQuery =
mapper.readValue<UpdateQuery>(File("src/test/resources/data/update-query.json"), UpdateQuery::class.java)!!
val request = UpdateByQueryRequest("index")
request.isAbortOnVersionConflict = true
request.setQuery(QueryBuilders.termQuery(updateQuery.term, updateQuery.value))
request.script = Script(
ScriptType.INLINE, "painless", updateQuery.source, updateQuery.params
)
}
}
\ No newline at end of file
{
"params": {
"containers": [
{
"facet": [],
"filter": "ati",
"name": {
"de": [
"Archivio di Stato del Cantone Ticino"
],
"fr": [
"Archivio di Stato del Cantone Ticino"
],
"it": [
"Archivio di Stato del Cantone Ticino"
],
"un": []
}
}
]
},
"source": "ctx._source['accessInstitution'] = params.containers",
"term": "recordSet.facet",
"value": "ati-001"
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment