Commit a73d29b0 authored by Lionel Walter's avatar Lionel Walter
Browse files

Refactor Report Generation

parent fcc0c4cd
Pipeline #10743 passed with stages
in 12 minutes and 6 seconds
from flask_restful import current_app
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
from import_api_app.models.Error import ImportApiError
def get_report(topic_name):
try: # read from kafka topic and generate report from it:
report = ''
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False
)
except KafkaError:
message = 'It was not possible to connect to the Kafka broker'
current_app.logger.error(message)
raise ImportApiError(message)
try: # read job-reporting messages from kafka
consumer.subscribe(topic_name)
job_messages = consumer.poll(1000, 1000)
processed = succeeded = 0
if job_messages:
for partition in job_messages:
for jobMessage in job_messages[partition]:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
else:
report = 'No messages found'
except KafkaError:
message = 'It was not possible to consume the Kafka messages'
current_app.logger.error(message)
raise ImportApiError(message)
if report == '':
report = 'success'
return {
'status': 'SUCCESS',
'report': report
}
else:
return {
'status': 'FAILURE',
'report': report
}
from flask_restful import Resource, current_app
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
from flask_restful import Resource
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Report import get_report
class JobReport(Resource):
class JobReport(Resource):
# Todo write/correct comment for swagger
def get(self, topic_name):
"""
......@@ -43,47 +42,7 @@ class JobReport(Resource):
example: Unexpected Kafka error
"""
try: # read from kafka topic and generate report from it:
report = ''
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False
)
except KafkaError:
current_app.logger.error('It was not possible to connect to the Kafka broker')
return {'error': 'Unexpected Kafka error'}, 500
try: # read job-reporting messages from kafka
topic_partition = TopicPartition(topic=topic_name, partition=0)
job_messages = consumer.poll(1000, 1000)
processed = succeeded = 0
if job_messages:
for jobMessage in job_messages[topic_partition]:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
else:
report = 'no messages found'
except KafkaError:
current_app.logger.error('It was not possible to consume the Kafka messages')
return {'error': 'Unexpected Kafka error'}, 500
if report == '':
report = 'success'
return {
'status': 'SUCCESS',
'report': report
}, 200
else:
return {
'status': 'FAILURE',
'report': report
}, 200
try:
return get_report(topic_name), 200
except ImportApiError as e:
return {'error': e.message}, 500
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment