Commit 8da8ca6f authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update README.

Improve reporting of service.
parent 75df432d
Pipeline #32133 passed with stages
in 3 minutes and 54 seconds
......@@ -8,9 +8,8 @@ The source for the consumer is one or several Kafka topics. Failed operations ar
### Index
Any message that is sent to the input topic will be indexed in the index.
The document is indexed with the Kafka message key. In the case that this
key is null or empty it is indexed without an id which will prompt
elasticsearch to generate a unique hash as key.
The document is indexed with the Kafka message key. If no key is set
the message is dropped and a fatal report is generated.
The message value is used as the document body. It has to be a valid
JSON document and need at least one property. If the body is empty the message
......
......@@ -86,7 +86,7 @@ class BulkProcessor(
Report(
r.id(),
ReportStatus.success,
"",
"Successfully indexed document.",
serviceName
),
r.headers
......@@ -172,15 +172,24 @@ class BulkProcessor(
fun process() {
val records = consumer.consume()
records.forEach {
val reports = if (it.key().contains("#update")) {
val reports = if (it.key() == null || it.key() == "") {
listOf(
Report(
Service.getRandomReportId(),
ReportStatus.fatal,
"Received message without a key.",
serviceName
)
)
} else if (it.key().contains("#update")) {
log.info("Processing query request for ${it.key()} with message ${it.value()}.")
executeUpdateRequest(it)
} else {
log.info("Adding consumer record ${it.key()} to bulk processor.")
listOf(add(it))
}
// Only send these reports when they fail! Otherwise they will be duplicated. Fatal here means the
// request to ES could not be sent. (hopefully...)
// Only send reports that are fatal here! Otherwise, they will be duplicated. Fatal here means the
// request to ES could not be sent. (probably...)
reports
.filter { report -> report.status == ReportStatus.fatal }
.forEach { report ->
......@@ -190,18 +199,11 @@ class BulkProcessor(
}
private fun add(consumerRecord: ConsumerRecord<String, String>): Report {
if (consumerRecord.key() == null) {
Report(
Service.getRandomReportId(),
ReportStatus.fatal,
"Received message without a key: ${consumerRecord.value()}.",
serviceName
)
}
return try {
val recordIndexRequest = ConsumerRecordIndexRequest(consumerRecord, index)
bulk.add(recordIndexRequest)
recordsInBulk += 1
// This report is never sent.
Report(
consumerRecord.key(),
ReportStatus.success,
......@@ -280,7 +282,7 @@ class BulkProcessor(
Report(
consumerRecord.key(),
ReportStatus.success,
"",
"Successfully updated index $index.",
serviceName
)
)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment