Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit dacb1b55 authored by Matthias's avatar Matthias
Browse files

Merge remote-tracking branch 'origin/master'

parents 8f755306 9d50a4a5
Pipeline #26203 passed with stages
in 2 minutes and 30 seconds
import uuid
from flask_restful import Resource, current_app from flask_restful import Resource, current_app
from kafka import KafkaProducer from kafka import KafkaProducer
import requests import requests
import json import json
import traceback import traceback
from kafka.errors import KafkaTimeoutError
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
class UpdateRecordSet(Resource): class UpdateRecordSet(Resource):
def __init__(self):
self.producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False)
.encode('utf-8'))
# Todo write/correct comment for swagger # Todo write/correct comment for swagger
def get(self, record_set_id): def get(self, record_set_id):
""" """
...@@ -212,25 +221,44 @@ class UpdateRecordSet(Resource): ...@@ -212,25 +221,44 @@ class UpdateRecordSet(Resource):
'result_topic_value': result_topic_value, 'result_topic_value': result_topic_value,
}, 500 }, 500
# 2. write info into kafka topic return self.send_message(result_topic_value, record_set_id)
def send_message(self, result_topic_value, record_set_drupal_id):
headers = [
('recordSetId', bytes(result_topic_value.get('field_memobase_id'), encoding='utf-8')),
('sessionId', bytes(str(uuid.uuid4()), encoding='utf-8')),
('institutionId', bytes('none', encoding='utf-8')),
('isPublished', bytes(str(result_topic_value['status']), encoding='utf-8'))
]
try: try:
producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False)
.encode('utf-8'))
key = bytes(result_topic_value.get('field_memobase_id'), encoding='utf-8') key = bytes(result_topic_value.get('field_memobase_id'), encoding='utf-8')
producer.send(current_app.config['topic-drupal-export'], result_topic_value, key=key) current_app.logger.debug(
except Exception as ex: f'Send message: key={key}, headers={headers}, '
msg = 'Exception for ' + record_set_id + ': ' + str(ex) + '\n' + \ f'message: {json.dumps(result_topic_value, ensure_ascii=False)}')
traceback.format_exc() self.producer.send(current_app.config['topic-drupal-export'], result_topic_value,
key=key, headers=headers)
except KafkaTimeoutError as ex:
msg = f'KafkaTimeoutError ({record_set_drupal_id}): {ex}.'
current_app.logger.error(msg) current_app.logger.error(msg)
return { return {
'status': 'FAILURE', 'status': 'FAILURE',
'topic_key': result_topic_value.get('field_memobase_id'), 'topic_key': result_topic_value.get('field_memobase_id'),
'result_topic_value': result_topic_value, 'result_topic_value': result_topic_value,
'exception': msg 'exception': msg
}, 500 }, 503
except Exception as ex:
msg = f'Could not import {result_topic_value.get("field_memobase_id")} ' \
f'(Drupal UUID: {record_set_drupal_id}) (Unknown Exception): ' + str(ex)
current_app.logger.error(f"{msg}\n{traceback.format_exc()}")
return {
'status': 'FAILURE',
'topic_key': result_topic_value.get('field_memobase_id'),
'result_topic_value': result_topic_value,
'exception': msg
}, 503
current_app.logger.debug('success for ' + record_set_id) current_app.logger.debug('success for ' + record_set_drupal_id)
return { return {
'status': 'SUCCESS', 'status': 'SUCCESS',
'topic_key': result_topic_value.get('field_memobase_id'), 'topic_key': result_topic_value.get('field_memobase_id'),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment