Commit 06cf86ca authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Improve logging

Change query update params to be able to contain ANY type of object.
parent a8caf4e1
Pipeline #22656 passed with stages
in 3 minutes and 56 seconds
......@@ -24,7 +24,6 @@ import com.beust.klaxon.Klaxon
import com.beust.klaxon.KlaxonException
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.bulk.BulkProcessor
......@@ -38,7 +37,6 @@ import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.index.reindex.BulkByScrollResponse
import org.elasticsearch.index.reindex.UpdateByQueryRequest
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.script.Script
import org.elasticsearch.script.ScriptType.INLINE
......@@ -176,7 +174,7 @@ class BulkProcessor(
val records = consumer.consume()
records.forEach {
val report = if (it.key().contains("#update")) {
log.info("Add query request to bulk processor.")
log.info("Processing query request.")
addUpdateByQueryRequest(it)
} else {
log.info("Adding consumer record ${it.key()} to bulk processor.")
......@@ -222,6 +220,14 @@ class BulkProcessor(
)
producer.sendReport(report, null)
}
producer.sendReport(
Report(
response.toString(),
ReportStatus.success,
"",
serviceName
), null
)
} else {
val report = Report(
"",
......
......@@ -4,6 +4,5 @@ data class UpdateQuery(
val term: String,
val value: String,
val source: String,
val params: Map<String, String>
val params: Map<String, Any>
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment