Commit 32efc562 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Add headers to institution update message.

parent 47d42f0a
Pipeline #26212 passed with stages
in 2 minutes and 2 seconds
import uuid
from flask_restful import Resource, current_app from flask_restful import Resource, current_app
from kafka import KafkaProducer from kafka import KafkaProducer
import requests import requests
...@@ -8,6 +10,11 @@ from requests.auth import HTTPBasicAuth ...@@ -8,6 +10,11 @@ from requests.auth import HTTPBasicAuth
class UpdateInstitution(Resource): class UpdateInstitution(Resource):
def __init__(self):
self.producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False)
.encode('utf-8'))
def get(self, institution_drupal_uuid): def get(self, institution_drupal_uuid):
""" """
Update the institution with the given drupal UUID in the backend. Update the institution with the given drupal UUID in the backend.
...@@ -36,7 +43,7 @@ class UpdateInstitution(Resource): ...@@ -36,7 +43,7 @@ class UpdateInstitution(Resource):
example: the value written into the topic example: the value written into the topic
""" """
topic_value = '' result = ''
headers = {'X-API-Key': current_app.config['drupal-api-key']} headers = {'X-API-Key': current_app.config['drupal-api-key']}
user = current_app.config['drupal-user'] user = current_app.config['drupal-user']
password = current_app.config['drupal-password'] password = current_app.config['drupal-password']
...@@ -97,7 +104,7 @@ class UpdateInstitution(Resource): ...@@ -97,7 +104,7 @@ class UpdateInstitution(Resource):
return { return {
'status': 'FAILURE', 'status': 'FAILURE',
'message': str(ex), 'message': str(ex),
'topic_value': topic_value 'topic_value': result
}, 500 }, 500
except Exception as ex: except Exception as ex:
msg = f'Unknown Exception ({institution_drupal_uuid}): {ex}\n{traceback.format_exc()}' msg = f'Unknown Exception ({institution_drupal_uuid}): {ex}\n{traceback.format_exc()}'
...@@ -105,9 +112,9 @@ class UpdateInstitution(Resource): ...@@ -105,9 +112,9 @@ class UpdateInstitution(Resource):
return { return {
'status': 'FAILURE', 'status': 'FAILURE',
'message': str(ex), 'message': str(ex),
'topic_value': topic_value 'topic_value': result
}, 500 }, 500
topic_value = { result = {
'type': institution_data_de['type'], 'type': institution_data_de['type'],
'status': institution_data_de['attributes']['status'], 'status': institution_data_de['attributes']['status'],
'title_de': institution_data_de['attributes']['title'], 'title_de': institution_data_de['attributes']['title'],
...@@ -132,29 +139,31 @@ class UpdateInstitution(Resource): ...@@ -132,29 +139,31 @@ class UpdateInstitution(Resource):
institution_data_de['attributes']['computed_teaser_image_url'], institution_data_de['attributes']['computed_teaser_image_url'],
'computed_teaser_color': institution_data_de['attributes']['computed_teaser_color'], 'computed_teaser_color': institution_data_de['attributes']['computed_teaser_color'],
} }
producer_topic = current_app.config['topic-drupal-export']
# 2. write info into kafka topic headers = [
('recordSetId', bytes('none', encoding='utf-8')),
('sessionId', bytes(str(uuid.uuid4()), encoding='utf-8')),
('institutionId', bytes(result.get('field_memobase_id'), encoding='utf-8')),
('isPublished', bytes(str(result['status']), encoding='utf-8'))
]
try: try:
producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'], key = bytes(result.get('field_memobase_id'), encoding='utf-8')
value_serializer=lambda m: json.dumps(m, ensure_ascii=False) self.producer.send(producer_topic, result, key, headers=headers)
.encode('utf-8'))
key = bytes(topic_value.get('field_memobase_id'), encoding='utf-8')
producer.send(current_app.config['topic-drupal-export'], topic_value, key)
except Exception as ex: except Exception as ex:
msg = 'Exception for ' + institution_drupal_uuid + ': ' + str(ex) + '\n' + \ msg = f'Unknown Exception ({institution_drupal_uuid}): {ex}. '\
traceback.format_exc() f'Check logs for more details.'
current_app.logger.error(msg) current_app.logger.error(f'{msg}\n{traceback.format_exc()}')
return { return {
'status': 'FAILURE', 'status': 'FAILURE',
'topic_key': topic_value.get('field_memobase_id'), 'topic_key': result.get('field_memobase_id'),
'topic_value': topic_value 'topic_value': result
}, 500 }, 500
current_app.logger.debug('success for ' + institution_drupal_uuid) current_app.logger.debug('success for ' + institution_drupal_uuid)
return { return {
'status': 'SUCCESS', 'status': 'SUCCESS',
'topic_key': topic_value.get('field_memobase_id'), 'topic_key': result.get('field_memobase_id'),
'topic_value': topic_value 'topic_value': result
}, 200 }, 200
@staticmethod @staticmethod
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment