diff --git a/import_api_app/models/KafkaTopics.py b/import_api_app/models/KafkaTopics.py index 21658b64f7c1268fe413b1f57f9fc1a43ad5e7a1..f357386d8bd315b93a00fdf0016700c2572612bc 100644 --- a/import_api_app/models/KafkaTopics.py +++ b/import_api_app/models/KafkaTopics.py @@ -23,7 +23,7 @@ def list_topics(): # It also deletes the topics if it already exists if the # delete_if_already_exists flag is set to true -def create_topics(topics, delete_if_already_exist): +def create_topics(topics): admin_client = KafkaAdminClient( bootstrap_servers=current_app.config['kafka-broker-url'], client_id='import-api' @@ -39,23 +39,22 @@ def create_topics(topics, delete_if_already_exist): current_app.logger.error(message) raise ImportApiError(message) - if delete_if_already_exist: - topic_to_delete = [] - for topic in topics: - if topic in current_topics: - topic_to_delete.append(topic) + topic_to_delete = [] + for topic in topics: + if topic in current_topics: + topic_to_delete.append(topic) - try: - admin_client.delete_topics(topic_to_delete) - except KafkaError: - message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete) - current_app.logger.error(message) - raise ImportApiError(message) + try: + admin_client.delete_topics(topic_to_delete) + except KafkaError: + message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete) + current_app.logger.error(message) + raise ImportApiError(message) - # wait that topics are deleted - # todo Does that make sense ? Does it take longer when the topic is full of messages ? - # todo : do a loop until the exception is notopics created is not raised any more - time.sleep(0.1) + # wait that topics are deleted + # todo Does that make sense ? Does it take longer when the topic is full of messages ? + # todo : do a loop until the exception is notopics created is not raised any more + time.sleep(0.1) new_topic_list = [] @@ -66,9 +65,9 @@ def create_topics(topics, delete_if_already_exist): try: admin_client.create_topics(new_topic_list) except KafkaError: - message = "Problem when creating new topics :" + ", ".join(topics) + message = "Problem when creating new topics :" + ", ".join(topics)+" e.message()" current_app.logger.error(message) - return {'error': message}, 500 + raise ImportApiError(message) return { 'success': "Topics created : " + ", ".join(topics) } diff --git a/import_api_app/resources/JobStart.py b/import_api_app/resources/JobStart.py index e829398826bf70cd214125ea937a17f6c6466e8a..c04156a12fcc45044ddabc1f6357caf889702c3e 100644 --- a/import_api_app/resources/JobStart.py +++ b/import_api_app/resources/JobStart.py @@ -26,17 +26,21 @@ class JobStart(Resource): # todo take care of creation of process_id-reporting job_topic = f"{process_id}-{job_name}" job_reporting_topic = f"{process_id}-{job_name}-reporting" - process_reporting_topic = f"{process_id}-reporting" topics = [ job_topic, job_reporting_topic, ] - # create job topics + # if we are in the first step, text-file-validation, we create the generic reporting topic + # (and remove its content if it already exists) + if job_name == 'text-file-validation': + process_reporting_topic = f"{process_id}-reporting" + topics.append(process_reporting_topic) + + # create job topics try: create_topics(topics, True) - create_topics([process_reporting_topic], False) except ImportApiError as e: return {'error': e.message}, 500