Commit 90b86bad authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Make producer connection a bit more robust.

parent fe40730b
Pipeline #37962 passed with stages
in 1 minute and 23 seconds
......@@ -19,19 +19,26 @@ import uuid
import requests
from flask.views import MethodView
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
from requests.auth import HTTPBasicAuth
from import_api_app.app import app
def connect_producer():
return KafkaProducer(
bootstrap_servers=app.config["kafka-broker-url"],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False).encode("utf-8")
)
class UpdateInstitution(MethodView):
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=app.config["kafka-broker-url"],
value_serializer=lambda m: json.dumps(m, ensure_ascii=False).encode(
"utf-8"
),
)
try:
self.producer = connect_producer()
except NoBrokersAvailable as err:
self.producer = None
app.logger.error(str(err))
def get(self, institution_drupal_uuid):
"""
......@@ -183,6 +190,8 @@ class UpdateInstitution(MethodView):
("isPublished", bytes(str(result["status"]), encoding="utf-8")),
]
key = bytes(result.get("field_memobase_id"), encoding="utf-8")
while self.producer is None:
self.producer = connect_producer()
self.producer.send(producer_topic, result, key, headers=headers)
except Exception as ex:
msg = (
......@@ -191,17 +200,17 @@ class UpdateInstitution(MethodView):
)
app.logger.error(msg)
return {
"status": "FAILURE",
"topic_key": result.get("field_memobase_id"),
"topic_value": result,
}, 500
"status": "FAILURE",
"topic_key": result.get("field_memobase_id"),
"topic_value": result,
}, 500
app.logger.debug("success for " + institution_drupal_uuid)
return {
"status": "SUCCESS",
"topic_key": result.get("field_memobase_id"),
"topic_value": result,
}, 200
"status": "SUCCESS",
"topic_key": result.get("field_memobase_id"),
"topic_value": result,
}, 200
@staticmethod
def request_institution_data(url, headers, auth):
......@@ -213,13 +222,13 @@ class UpdateInstitution(MethodView):
return json_data["data"], 200
else:
return {
"message": "No key 'data' found in json response: " + json_data
}, 500
"message": "No key 'data' found in json response: " + json_data
}, 500
except json.decoder.JSONDecodeError:
return {
"message": "Could not parse response message as JSON: "
+ response.text
}, 404
"message": "Could not parse response message as JSON: "
+ response.text
}, 404
else:
return {"url": url, "message": response.text}, response.status_code
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment