Commit 52c422be authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Make request synchronous

parent 1cef20e7
Pipeline #24594 passed with stages
in 4 minutes and 40 seconds
......@@ -174,14 +174,14 @@ class BulkProcessor(
fun process() {
val records = consumer.consume()
records.forEach {
val report = if (it.key().contains("#update")) {
val reports = if (it.key().contains("#update")) {
log.info("Processing query request for ${it.key()} with message ${it.value()}.")
executeUpdateRequest(it)
} else {
log.info("Adding consumer record ${it.key()} to bulk processor.")
add(it)
listOf(add(it))
}
if (report.status != ReportStatus.success) {
reports.forEach { report ->
producer.sendReport(report, it.headers())
}
}
......@@ -217,57 +217,69 @@ class BulkProcessor(
}
}
private fun executeUpdateRequest(consumerRecord: ConsumerRecord<String, String>): Report {
private fun executeUpdateRequest(consumerRecord: ConsumerRecord<String, String>): List<Report> {
val updateQuery = try {
mapper.readValue<UpdateQuery>(consumerRecord.value(), UpdateQuery::class.java).let {
it
?: return Report(
?: return listOf(Report(
consumerRecord.key(),
ReportStatus.fatal,
"Input message is empty.",
serviceName
)
))
}
} catch (ex: JsonProcessingException) {
log.error("JSON Parse Error: ${ex.localizedMessage}")
return Report(
return listOf(Report(
consumerRecord.key(),
ReportStatus.fatal,
"JSON Parse Error: ${ex.localizedMessage}",
serviceName
)
))
} catch (ex: JsonMappingException) {
log.error("JSON Parse Error: ${ex.localizedMessage}")
return Report(
return listOf(Report(
consumerRecord.key(),
ReportStatus.fatal,
"JSON Parse Error: ${ex.localizedMessage}",
serviceName
)
))
}
try {
return try {
val request = UpdateByQueryRequest(index)
request.isAbortOnVersionConflict = true
request.setQuery(QueryBuilders.termQuery(updateQuery.term, updateQuery.value))
request.script = Script(
INLINE, "painless", updateQuery.source, updateQuery.params
)
client.updateByQueryAsync(request, RequestOptions.DEFAULT, updateByQueryListener)
return Report(
consumerRecord.key(),
ReportStatus.success,
"",
serviceName
)
val response = client.updateByQuery(request, RequestOptions.DEFAULT)
if (response.total > response.updated) {
response.bulkFailures.map {
Report(
consumerRecord.key(),
ReportStatus.fatal,
"Indexing Failure: ${it.message}",
serviceName
)
}
} else {
listOf(Report(
consumerRecord.key(),
ReportStatus.success,
"",
serviceName
))
}
} catch (ex: Exception) {
log.error("Unknown exception: ${ex.localizedMessage}")
return Report(
listOf(Report(
consumerRecord.key(),
ReportStatus.fatal,
"Unknown Exception: ${ex.localizedMessage}",
serviceName
)
))
}
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment