Report.py 2.52 KB
Newer Older
Lionel Walter's avatar
Lionel Walter committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask_restful import current_app
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json

from import_api_app.models.Error import ImportApiError


def get_report(topic_name):
    try:  # read from kafka topic and generate report from it:
        consumer = KafkaConsumer(
            topic_name,
            value_deserializer=lambda m: json.loads(m.decode('utf8')),
            bootstrap_servers=current_app.config['kafka-broker-url'],
            auto_offset_reset='earliest',
16
17
18
            enable_auto_commit=False,
            # we stop reading messages after 2 seconds
            consumer_timeout_ms=2000
19
20
            # todo maybe add group id ?
            # group_id='import-api-report'+topic_name
Lionel Walter's avatar
Lionel Walter committed
21
22
23
24
25
26
27
28
        )
    except KafkaError:
        message = 'It was not possible to connect to the Kafka broker'
        current_app.logger.error(message)
        raise ImportApiError(message)

    try:  # read job-reporting messages from kafka
        processed = succeeded = 0
29
30
31
32
33
34
35
36
37
38
        report = ''
        for jobMessage in consumer:
            msg_value = jobMessage.value
            if msg_value['status'] == 'FAILURE':
                report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
            else:
                succeeded = succeeded + 1
            processed = processed + 1
        report = report[:-2]
        if processed == 0:
Lionel Walter's avatar
Lionel Walter committed
39
40
41
42
43
            report = 'No messages found'
    except KafkaError:
        message = 'It was not possible to consume the Kafka messages'
        current_app.logger.error(message)
        raise ImportApiError(message)
44
45
46
47
48
49
50
51
52
53
54
55
56
    # todo: fix this problem, seems to be a problem when there is only
    #  one message in the topic somehow
    except UnboundLocalError:
        if 'text-file-validation' in topic_name:
            report = 'text-file-validation-report-bug-with-checker'
            return {
                'status': 'SUCCESS',
                'report': report
            }
        else:
            message = 'Problem generating the report'
            current_app.logger.error(message)
            raise ImportApiError(message)
Lionel Walter's avatar
Lionel Walter committed
57
58

    if report == '':
59
        report = str(processed)+' records successfully processed'
Lionel Walter's avatar
Lionel Walter committed
60
61
62
63
64
        return {
                   'status': 'SUCCESS',
                   'report': report
               }
    else:
65
66
        report_header = str(processed) + ' records processed\r\n'
        report_header += str(succeeded) + ' successful\r\n'
Lionel Walter's avatar
Lionel Walter committed
67
68
        return {
                   'status': 'FAILURE',
69
                   'report': report_header+report
Lionel Walter's avatar
Lionel Walter committed
70
               }