Commit e9dcf1ce authored by Lionel Walter's avatar Lionel Walter
Browse files

Merge branch 'MEMO-454-Load-Mapping-File'

parents f2b5bc59 e9263f57
Pipeline #10907 passed with stages
in 12 minutes and 57 seconds
......@@ -4,27 +4,24 @@ from flask_restful import Api
from flasgger import Swagger
from kubernetes.config import ConfigException
from import_api_app.resources.HelmStart import HelmStart
from import_api_app.resources.HelmStop import HelmStop
from import_api_app.resources.JobList import JobList
from import_api_app.resources.JobStart import JobStart
from import_api_app.resources.JobStop import JobStop
from import_api_app.resources.JobReport import JobReport
from import_api_app.resources.Job import Job
from import_api_app.resources.KafkaTopics import KafkaTopics
# from import_api_app.resources.ReadJobOptionsFromDrupal import ReadJobOptionsFromDrupal
import import_api_app.configuration
from import_api_app.resources.WriteJobResultsToDrupal import WriteJobResultToDrupal
from import_api_app.resources.ReadJobOptionsFromDrupal import ReadJobOptionsFromDrupal
# from import_api_app.resources.HelmStart import HelmStart
# from import_api_app.resources.HelmStop import HelmStop
import os
import logging
def create_app(test_config=None):
app = Flask(__name__)
app.config.from_object(import_api_app.configuration)
if os.environ['FLASK_ENV'] == 'development':
app.logger.setLevel(logging.DEBUG)
# app.config.from_envvar('YOURAPPLICATION_SETTINGS')
# Based on this example :
......@@ -70,11 +67,11 @@ def create_app(test_config=None):
)
api.add_resource(JobStop, '/v1/job/<process_id>/<job_name>/stop')
# api.add_resource(
# HelmStart,
# '/v1/helm/<institution_id>/<record_set_id>/<process_id>/<job_name>/start'
# )
# api.add_resource(HelmStop, '/v1/helm/<process_id>/<job_name>/stop')
api.add_resource(
HelmStart,
'/v1/helm/<institution_id>/<record_set_id>/<process_id>/<job_name>/start'
)
api.add_resource(HelmStop, '/v1/helm/<process_id>/<job_name>/stop')
api.add_resource(Job, '/v1/job/<process_id>/<job_name>')
api.add_resource(WriteJobResultToDrupal, '/v1/drupal/<job_log_drupal_uuid>/<report>')
......
from import_api_app.models.Error import ImportApiError
from flask_restful import current_app
from datetime import datetime
import requests
import json
def write_results(job_log_drupal_uuid, report):
current_app.logger.debug("uuid to write: " + job_log_drupal_uuid)
current_app.logger.debug("report to write: " + report)
headers = {
'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json',
'X-API-Key': current_app.config['drupal-api-key']
}
data = {
"data": {
"id": job_log_drupal_uuid,
"type": "paragraph--job_log_result",
"attributes": {
"field_summary": report,
"field_end_date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S+00:00")
}
}
}
url = current_app.config['DRUPAL_API_URL'] + \
'/paragraph/job_log_result/' + job_log_drupal_uuid
try:
response = requests.patch(
url,
headers=headers,
data=json.dumps(data)
)
except requests.exceptions.RequestException:
message = "It was not possible to write to Drupal API \
via the following url " + url
current_app.logger.error(message)
raise ImportApiError(message)
if response.status_code == 200:
current_app.logger.error('Updated: '+url)
return {'message': 'Updated: ' + url}
elif response.status_code == 403:
message = "Not authorized to write to: " + url
current_app.logger.error(message)
raise ImportApiError(message)
elif response.status_code == 404:
message = 'Not Found: ' + url
current_app.logger.error(message)
raise ImportApiError(message)
else:
message = "Unknown response status code for drupal api for url " + url
current_app.logger.error(message)
raise ImportApiError(message)
# make the job as not active any more and write the end data
# todo refactor this in a more generic function
def update_status(job_drupal_uuid, job_name):
# in drupal job names have underscore instead of minus
job_name = job_name.replace("-", "_")
current_app.logger.debug("uuid to write: " + job_drupal_uuid)
headers = {
'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json',
'X-API-Key': current_app.config['drupal-api-key']
}
data = {
"data": {
"id": job_drupal_uuid,
"type": "paragraph--job_"+job_name,
"attributes": {
"field_is_active": False
}
}
}
url = current_app.config['DRUPAL_API_URL'] + \
'/paragraph/job_'+job_name+'/' + job_drupal_uuid
try:
response = requests.patch(
url,
headers=headers,
data=json.dumps(data)
)
except requests.exceptions.RequestException:
message = "It was not possible to write to Drupal API \
via the following url " + url
current_app.logger.error(message)
raise ImportApiError(message)
if response.status_code == 200:
current_app.logger.error('Updated: '+url)
return {'message': 'Updated: ' + url}
elif response.status_code == 403:
message = "Not authorized to write to: " + url
current_app.logger.error(message)
raise ImportApiError(message)
elif response.status_code == 404:
message = 'Not Found: ' + url
current_app.logger.error(message)
raise ImportApiError(message)
else:
message = "Unknown response status code for drupal api for url " + url
current_app.logger.error(message)
raise ImportApiError(message)
class ImportApiError(Exception):
def __init__(self, message):
self.message = message
from flask_restful import current_app
from import_api_app.utility import generate_helm_name
import import_api_app.helm as helm
import os
from subprocess import CalledProcessError
from import_api_app.models.Error import ImportApiError
def start(process_id, job_name, job_parameters):
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.install(
chart=os.path.join(current_app.root_path, "charts", job_name),
name=helm_name,
namespace=current_app.config['NAMESPACE'],
set_values=job_parameters,
fail_on_err=False
)
message = (output.stdout + output.stderr)
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': 'Job successfully started',
'job_id': helm_name
}, 201
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
message = "It was not possible to run the helm install command"
current_app.logger.error(message)
raise ImportApiError(message)
def stop(process_id, job_name):
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.uninstall(
name=helm_name,
namespace=current_app.config['NAMESPACE'],
fail_on_err=False
)
message = output.stdout + output.stderr
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 200
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
message = "It was not possible to run the helm uninstall command"
current_app.logger.error(message)
raise ImportApiError(message)
from kafka import KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from flask_restful import current_app
from kafka.errors import KafkaError
from import_api_app.models.Error import ImportApiError
import time
def list_topics():
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
return topics
# It also deletes the topics if it already exists if the
# delete_if_already_exists flag is set to true
def create_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
current_topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
for topic in topics:
# todo use multiple partitions
new_topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=1))
try:
admin_client.create_topics(new_topic_list)
except KafkaError:
message = "Problem when creating new topics :" + ", ".join(topics)+" e.message()"
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics created : " + ", ".join(topics)
}
def delete_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
try:
admin_client.delete_topics(topics)
except KafkaError:
message = "Problem when deleting topics (probably they don't exist) :" + ", ".join(topics)
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics deleted : " + ", ".join(topics)
}
from flask_restful import current_app
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
from import_api_app.models.Error import ImportApiError
def get_report(topic_name):
try: # read from kafka topic and generate report from it:
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False,
# we stop reading messages after 2 seconds
consumer_timeout_ms=2000
# todo maybe add group id ?
# group_id='import-api-report'+topic_name
)
except KafkaError:
message = 'It was not possible to connect to the Kafka broker'
current_app.logger.error(message)
raise ImportApiError(message)
try: # read job-reporting messages from kafka
processed = succeeded = 0
report = ''
for jobMessage in consumer:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
if processed == 0:
report = 'No messages found'
except KafkaError:
message = 'It was not possible to consume the Kafka messages'
current_app.logger.error(message)
raise ImportApiError(message)
# todo: fix this problem, seems to be a problem when there is only
# one message in the topic somehow
except UnboundLocalError:
if 'text-file-validation' in topic_name:
report = 'File correctly loaded (this needs to be checked more carefully)'
current_app.logger.debug("Report: " + report)
return {
'status': 'SUCCESS',
'report': report
}
else:
message = 'Problem generating the report'
current_app.logger.error(message)
raise ImportApiError(message)
if report == '':
report = str(processed)+' records successfully processed'
current_app.logger.debug("Report: "+report)
return {
'status': 'SUCCESS',
'report': report
}
else:
if 'text-file-validation' in topic_name:
report = 'Impossible to load the file(s) from the SFTP server'
current_app.logger.debug("Report: " + report)
return {
'status': 'SUCCESS',
'report': report
}
else:
report_header = str(processed) + ' records processed\r\n'
report_header += str(succeeded) + ' successful\r\n'
current_app.logger.debug("Report: " + report_header+report)
return {
'status': 'FAILURE',
'report': report_header+report
}
......@@ -10,8 +10,8 @@ MAPPER_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-proce
TFV_CHART_VERSION="0.3.5-chart"
TDT_CHART_VERSION="0.4.3-chart"
MAPPER_CHART_VERSION="0.9.1-chart"
TDT_CHART_VERSION="0.4.4-chart"
MAPPER_CHART_VERSION="0.9.3-chart"
helm chart pull ${TFV_REGISTRY}:${TFV_CHART_VERSION}
helm chart export ${TFV_REGISTRY}:${TFV_CHART_VERSION} -d charts/
......
from flask_restful import Resource, current_app, reqparse
from flask_restful import Resource, reqparse
from flasgger import swag_from
from import_api_app.utility import generate_helm_name
import import_api_app.helm as helm
import os
from subprocess import CalledProcessError
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Helm import start
class HelmStart(Resource):
# Todo validate requests
# @swag.validate('job-parameters')
@swag_from('HelmStart.yml')
def post(self, institution_id, record_set_id, process_id, job_name):
......@@ -21,31 +17,7 @@ class HelmStart(Resource):
job_parameters['recordSetId'] = record_set_id
job_parameters['processId'] = process_id
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.install(
chart=os.path.join(current_app.root_path, "charts", job_name),
name=helm_name,
namespace=current_app.config['NAMESPACE'],
set_values=job_parameters,
fail_on_err=False
)
message = (output.stdout + output.stderr)
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 201
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
current_app.logger.error("It was not possible to run the helm install command")
return {'error': 'Unexpected Helm error'}, 500
return start(process_id, job_name, job_parameters)
except ImportApiError as e:
return {'error': e.message}, 500
from flask_restful import Resource, current_app
import import_api_app.helm as helm
from flask_restful import Resource
from flasgger import swag_from
from import_api_app.utility import generate_helm_name
from subprocess import CalledProcessError
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Helm import stop
class HelmStop(Resource):
......@@ -10,30 +10,7 @@ class HelmStop(Resource):
# @swag.validate('job-parameters')
@swag_from('HelmStop.yml')
def delete(self, process_id, job_name):
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.uninstall(
name=helm_name,
namespace=current_app.config['NAMESPACE'],
fail_on_err=False
)
message = output.stdout + output.stderr
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 200
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
current_app.logger.error("It was not possible to run the helm install command")
return stop(process_id, job_name)
except ImportApiError:
return {'error': 'Unexpected Helm error'}, 500
......@@ -27,6 +27,7 @@ class JobList(Resource):
# label_selector="jobType=import-job,institutionId=123"
# )
# todo add try except here
# only jobs which have import-job in the jobType job label
jobs = v1.list_namespaced_job(
namespace=current_app.config['NAMESPACE'],
......
from flask_restful import Resource, current_app
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
from flask_restful import Resource
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Report import get_report
class JobReport(Resource):
class JobReport(Resource):
# Todo write/correct comment for swagger
def get(self, topic_name):
"""
......@@ -43,47 +42,7 @@ class JobReport(Resource):
example: Unexpected Kafka error
"""
try: # read from kafka topic and generate report from it:
report = ''
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False
)
except KafkaError:
current_app.logger.error('It was not possible to connect to the Kafka broker')
return {'error': 'Unexpected Kafka error'}, 500
try: # read job-reporting messages from kafka
topic_partition = TopicPartition(topic=topic_name, partition=0)
job_messages = consumer.poll(1000, 1000)
processed = succeeded = 0
if job_messages:
for jobMessage in job_messages[topic_partition]:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
else:
report = 'no messages found'
except KafkaError:
current_app.logger.error('It was not possible to consume the Kafka messages')
return {'error': 'Unexpected Kafka error'}, 500
if report == '':
report = 'success'
return {
'status': 'SUCCESS',
'report': report
}, 200
else:
return {
'status': 'FAILURE',
'report': report
}, 200
try:
return get_report(topic_name), 200
except ImportApiError as e:
return {'error': e.message}, 500
from flask_restful import Resource, current_app,