Commit d549b02d authored by Matthias's avatar Matthias
Browse files

check job queue for errors

parent a88bfe4c
Pipeline #11387 passed with stages
in 10 minutes and 57 seconds
......@@ -7,9 +7,12 @@ from import_api_app.models.Error import ImportApiError
def get_report(topic_name):
report = ''
consumer = None
try: # read from kafka topic and generate report from it:
# 1. read from general job topic and check for errors
consumer = KafkaConsumer(
topic_name,
topic_name[0:topic_name.index('-')+1] + 'reporting',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
......@@ -19,43 +22,69 @@ def get_report(topic_name):
# todo maybe add group id ?
# group_id='import-api-report'+topic_name
)
except KafkaError:
message = 'It was not possible to connect to the Kafka broker'
current_app.logger.error(message)
raise ImportApiError(message)
try: # read job-reporting messages from kafka
processed = succeeded = 0
report = ''
for jobMessage in consumer:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
if processed == 0:
report = 'No messages found'
if 'error' in msg_value:
report += 'error: ' + msg_value['error'] + "\r\n"
elif 'status' in msg_value and msg_value['status'] == 'FAILURE':
report += 'error in job ' + msg_value['job_id'] + ': ' +\
msg_value['message']
except KafkaError:
message = 'It was not possible to consume the Kafka messages'
message = 'It was not possible to connect to the Kafka broker'
current_app.logger.error(message)
raise ImportApiError(message)
# todo: fix this problem, seems to be a problem when there is only
# one message in the topic somehow
except UnboundLocalError:
if 'text-file-validation' in topic_name:
report = 'File correctly loaded (this needs to be checked more carefully)'
current_app.logger.debug("Report: " + report)
return {
'status': 'SUCCESS',
'report': report
}
else:
message = 'Problem generating the report'
except Exception as ex:
current_app.logger.error(str(ex))
if report == '':
# 2. read form job topic and process data:
try:
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False,
# we stop reading messages after 2 seconds
consumer_timeout_ms=2000
# todo maybe add group id ?
# group_id='import-api-report'+topic_name
)
except KafkaError:
message = 'It was not possible to connect to the Kafka broker'
current_app.logger.error(message)
raise ImportApiError(message)
try: # read job-reporting messages from kafka
processed = succeeded = 0
for jobMessage in consumer:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
if processed == 0:
report = 'No messages found'
except KafkaError:
message = 'It was not possible to consume the Kafka messages'
current_app.logger.error(message)
raise ImportApiError(message)
# todo: fix this problem, seems to be a problem when there is only
# one message in the topic somehow
except UnboundLocalError:
if 'text-file-validation' in topic_name:
report = 'File correctly loaded (this needs to be checked more carefully)'
current_app.logger.debug("Report: " + report)
return {
'status': 'SUCCESS',
'report': report
}
else:
message = 'Problem generating the report'
current_app.logger.error(message)
raise ImportApiError(message)
if report == '':
report = str(processed)+' records successfully processed'
current_app.logger.debug("Report: "+report)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment