Commit 335a3d74 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add process config values to headers.

Handle KafkaTimeoutErrors.
parent 337b76d7
import uuid
from flask_restful import Resource, current_app
from kafka import KafkaProducer
import requests
import json
import traceback
from kafka.errors import KafkaTimeoutError
from requests.auth import HTTPBasicAuth
class UpdateRecordSet(Resource):
producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False)
.encode('utf-8'))
# Todo write/correct comment for swagger
def get(self, record_set_id):
"""
......@@ -211,24 +219,38 @@ class UpdateRecordSet(Resource):
'topic_key': result_topic_value.get('field_memobase_id'),
'result_topic_value': result_topic_value,
}, 500
self.send_message(result_topic_value, record_set_id)
# 2. write info into kafka topic
def send_message(self, result_topic_value, record_set_id):
headers = [
('recordSetId', bytes(record_set_id)),
('sessionId', uuid.uuid4()),
('institutionId', bytes('none')),
('isPublished', bytes(result_topic_value['status']))
]
try:
producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False)
.encode('utf-8'))
key = bytes(result_topic_value.get('field_memobase_id'), encoding='utf-8')
producer.send(current_app.config['topic-drupal-export'], result_topic_value, key=key)
self.producer.send(current_app.config['topic-drupal-export'], result_topic_value,
key=key, headers=headers)
except KafkaTimeoutError as ex:
msg = f'KafkaTimeoutError ({record_set_id}): {ex}.'
current_app.logger.error(msg)
return {
'status': 'FAILURE',
'topic_key': result_topic_value.get('field_memobase_id'),
'result_topic_value': result_topic_value,
'exception': msg
}, 503
except Exception as ex:
msg = 'Exception for ' + record_set_id + ': ' + str(ex) + '\n' + \
traceback.format_exc()
msg = 'Could not import ' + record_set_id + ' (Unknown Exception): ' + str(ex)
current_app.logger.error(msg)
return {
'status': 'FAILURE',
'topic_key': result_topic_value.get('field_memobase_id'),
'result_topic_value': result_topic_value,
'exception': msg
}, 500
}, 503
current_app.logger.debug('success for ' + record_set_id)
return {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment