Commit a5831b39 authored by Matthias's avatar Matthias

Merge branch 'master' of...

Merge branch 'master' of gitlab.switch.ch:memoriav/memobase-2020/services/elastic-services/elastic-bulk-action-service
parents 7677606e f5828b24
Pipeline #19619 passed with stages
in 4 minutes and 12 seconds
deploymentName: pp-documents-indexer
elasticIndex: documents-v15
elasticIndex: documents-v16
topicName: search-doc-output-documents
\ No newline at end of file
......@@ -33,20 +33,25 @@ import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.rest.RestStatus
import java.util.Properties
import java.util.*
class BulkProcessor(private val consumer: Consumer,
properties: Properties,
private val log: Logger) {
private var recordsInBulk = 0
private val index = properties.getProperty("elastic.index")
private val client = RestHighLevelClient(RestClient.builder(
HttpHost(properties.getProperty("elastic.host"), properties.getProperty("elastic.port").toInt())))
private val listener = object : BulkProcessor.Listener {
var recordsCounter = 0
override fun beforeBulk(executionId: Long, request: BulkRequest) {
recordsCounter = recordsInBulk
recordsInBulk = 0
log.trace("Preparing bulk upload {}", executionId)
}
override fun afterBulk(executionId: Long, request: BulkRequest, response: BulkResponse) {
if (response.hasFailures()) {
val responses = response.items
......@@ -76,10 +81,19 @@ class BulkProcessor(private val consumer: Consumer,
}
}
} else {
log.info("Bulk upload {} successful. Indexed {} messages. Commit offsets to Kafka. Any failures are resent for processing.", executionId,
request.numberOfActions())
if (request.numberOfActions() == recordsCounter) {
log.info("Bulk upload {} successful. Indexed {} messages. Commit offsets to Kafka. Any failures are resent for processing.",
executionId,
request.numberOfActions())
} else {
log.warn("Bulk upload {} partially successful. Indexed {} out of {} messages. Commit offsets to Kafka. Any failures are resent for processing.",
executionId,
request.numberOfActions(),
recordsCounter)
}
}
recordsCounter = 0
}
override fun afterBulk(executionId: Long, request: BulkRequest, failure: Throwable) {
......@@ -98,7 +112,7 @@ class BulkProcessor(private val consumer: Consumer,
for (t in failure.suppressed) {
log.error("Suppressed Error: {}", t.message)
}
recordsCounter = 0
}
}
......@@ -124,7 +138,7 @@ class BulkProcessor(private val consumer: Consumer,
val records = consumer.consume()
records.forEach {
add(it)
log.info("Adding consumer record to bulk processor.")
log.info("Adding consumer record ${it.key()} to bulk processor.")
}
}
......@@ -132,6 +146,7 @@ class BulkProcessor(private val consumer: Consumer,
try {
val recordIndexRequest = ConsumerRecordIndexRequest(consumerRecord, index)
bulk.add(recordIndexRequest)
recordsInBulk += 1
} catch (ex: NullPointerException) {
log.error(ex.message, consumerRecord.key())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment