Commit a7c0a2f4 authored by Lionel Walter's avatar Lionel Walter
Browse files

Consume instead of poll to get reliable numbers in the report

parent 8448575e
Pipeline #10781 passed with stages
in 9 minutes and 39 seconds
......@@ -8,13 +8,14 @@ from import_api_app.models.Error import ImportApiError
def get_report(topic_name):
try: # read from kafka topic and generate report from it:
report = ''
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False
enable_auto_commit=False,
# we stop reading messages after 2 seconds
consumer_timeout_ms=2000
# todo maybe add group id ?
# group_id='import-api-report'+topic_name
)
......@@ -24,20 +25,17 @@ def get_report(topic_name):
raise ImportApiError(message)
try: # read job-reporting messages from kafka
consumer.subscribe(topic_name)
job_messages = consumer.poll(timeout_ms=10000, max_records=1000)
processed = succeeded = 0
if job_messages:
for partition in job_messages:
for jobMessage in job_messages[partition]:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
else:
report = ''
for jobMessage in consumer:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
if processed == 0:
report = 'No messages found'
except KafkaError:
message = 'It was not possible to consume the Kafka messages'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment