Commit 7c9d9f20 authored by Lionel Walter's avatar Lionel Walter

Create the generic process reporting topic when starting the first job

parent e44c3966
Pipeline #10759 passed with stages
in 9 minutes and 20 seconds
......@@ -23,7 +23,7 @@ def list_topics():
# It also deletes the topics if it already exists if the
# delete_if_already_exists flag is set to true
def create_topics(topics, delete_if_already_exist):
def create_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
......@@ -39,23 +39,22 @@ def create_topics(topics, delete_if_already_exist):
current_app.logger.error(message)
raise ImportApiError(message)
if delete_if_already_exist:
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
......@@ -66,9 +65,9 @@ def create_topics(topics, delete_if_already_exist):
try:
admin_client.create_topics(new_topic_list)
except KafkaError:
message = "Problem when creating new topics :" + ", ".join(topics)
message = "Problem when creating new topics :" + ", ".join(topics)+" e.message()"
current_app.logger.error(message)
return {'error': message}, 500
raise ImportApiError(message)
return {
'success': "Topics created : " + ", ".join(topics)
}
......
......@@ -26,17 +26,21 @@ class JobStart(Resource):
# todo take care of creation of process_id-reporting
job_topic = f"{process_id}-{job_name}"
job_reporting_topic = f"{process_id}-{job_name}-reporting"
process_reporting_topic = f"{process_id}-reporting"
topics = [
job_topic,
job_reporting_topic,
]
# create job topics
# if we are in the first step, text-file-validation, we create the generic reporting topic
# (and remove its content if it already exists)
if job_name == 'text-file-validation':
process_reporting_topic = f"{process_id}-reporting"
topics.append(process_reporting_topic)
# create job topics
try:
create_topics(topics, True)
create_topics([process_reporting_topic], False)
except ImportApiError as e:
return {'error': e.message}, 500
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment