Commit ce9ca5e6 authored by Lionel Walter's avatar Lionel Walter
Browse files

Improve JobStart so that it makes all necessary steps

parent d7c25564
Pipeline #10744 passed with stages
in 9 minutes and 10 seconds
......@@ -21,8 +21,9 @@ def list_topics():
return topics
# It also deletes the topics if it already exists
def create_topics(topics):
# It also deletes the topics if it already exists if the
# delete_if_already_exists flag is set to true
def create_topics(topics, delete_if_already_exist):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
......@@ -38,28 +39,29 @@ def create_topics(topics):
current_app.logger.error(message)
raise ImportApiError(message)
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
if delete_if_already_exist:
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
for topic in topics:
# todo use multiple partitions
new_topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
new_topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=1))
try:
admin_client.create_topics(new_topic_list)
......
......@@ -30,7 +30,7 @@ def get_report(topic_name):
for jobMessage in job_messages[partition]:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\n"
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
......
from flask_restful import Resource, current_app, reqparse
from flask_restful import Resource, reqparse
from flasgger import swag_from
from import_api_app.utility import generate_helm_name
import import_api_app.helm as helm
import os
from subprocess import CalledProcessError
from import_api_app.models.Error import ImportApiError
from import_api_app.resources.KafkaTopics import create_topics
from import_api_app.resources.HelmStart import start
from import_api_app.resources.JobReport import get_report
from import_api_app.resources.WriteJobResultsToDrupal import write_results
class JobStart(Resource):
......@@ -21,34 +23,38 @@ class JobStart(Resource):
job_parameters['recordSetId'] = record_set_id
job_parameters['processId'] = process_id
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
# todo take care of creation of process_id-reporting
job_topic = f"{process_id}-{job_name}"
job_reporting_topic = f"{process_id}-{job_name}-reporting"
process_reporting_topic = f"{process_id}-reporting"
topics = [
job_topic,
job_reporting_topic,
]
# create job topics
try:
create_topics(topics, True)
create_topics(process_reporting_topic, False)
except ImportApiError as e:
return {'error': e.message}, 500
try:
helm_result = start(process_id, job_name, job_parameters)
except ImportApiError as e:
return {'error': e.message}, 500
# to debug parameters sent by drupal
current_app.logger.debug(job_parameters)
try:
report = get_report(job_reporting_topic)
except ImportApiError as e:
return {'error': e.message}, 500
# write report
# todo check if drupalJobLogResultUuid is defined
try:
output = helm_client.install(
chart=os.path.join(current_app.root_path, "charts", job_name),
name=helm_name,
namespace=current_app.config['NAMESPACE'],
set_values=job_parameters,
fail_on_err=False
)
message = (output.stdout + output.stderr)
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 201
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
current_app.logger.error("It was not possible to run the helm install command")
return {'error': 'Unexpected Helm error'}, 500
write_results(job_parameters['drupalJobLogResultUuid'], report["report"])
except ImportApiError as e:
return {'error': e.message}, 500
return helm_result
......@@ -13,7 +13,7 @@ parameters:
properties:
appDirectory:
type: string
example: BECKER
example: BAZ-B_MEI
description: The directory in the SFTP server
drupalProcessUuid:
type: string
......
......@@ -50,7 +50,7 @@ class KafkaTopics(Resource):
topics = args['topics']
try:
return create_topics(topics)
return create_topics(topics, True)
except ImportApiError as e:
return {'error': e.message}, 500
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment