Verified Commit 15706fe8 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

delete helm chart after job started; reformatting

parent 4c3a8c59
Pipeline #37734 passed with stages
in 1 minute and 8 seconds
......@@ -27,7 +27,7 @@ class Helm:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=fail_on_err,
text=True
text=True,
)
def install(
......
......@@ -20,7 +20,6 @@ from import_api_app.helpers.error import ImportApiError
class DeleteService:
def __init__(self, logger, namespace, root_path):
self.namespace = namespace
self.root_path = root_path
......@@ -29,12 +28,12 @@ class DeleteService:
def delete_record(self, record_id, session_id, dryrun):
set_values = {
'sessionId': session_id,
'deleteObject': 'record',
'deleteId': record_id
"sessionId": session_id,
"deleteObject": "record",
"deleteId": record_id,
}
try:
return self.do_helm_install(set_values, dryrun)
return self.run_job(set_values, dryrun)
except Exception as ex:
message = str(ex)
self.logger.error(message)
......@@ -42,12 +41,12 @@ class DeleteService:
def delete_recordset(self, recordset_id, session_id, dryrun):
set_values = {
'sessionId': session_id,
'deleteObject': 'record-set',
'deleteId': recordset_id
"sessionId": session_id,
"deleteObject": "record-set",
"deleteId": recordset_id,
}
try:
return self.do_helm_install(set_values, dryrun)
return self.run_job(set_values, dryrun)
except Exception as ex:
message = str(ex)
self.logger.error(message)
......@@ -55,41 +54,75 @@ class DeleteService:
def delete_institution(self, institution_id, session_id, dryrun):
set_values = {
'sessionId': session_id,
'deleteObject': 'institution',
'deleteId': institution_id,
"sessionId": session_id,
"deleteObject": "institution",
"deleteId": institution_id,
}
try:
return self.do_helm_install(set_values, dryrun)
return self.run_job(set_values, dryrun)
except Exception as ex:
message = str(ex)
self.logger.error(message)
raise ImportApiError(message)
def run_job(self, set_values: dict, dryrun: bool) -> str:
res_install = self.do_helm_install(set_values, dryrun)
if res_install.returncode > 0:
self.logger.error(
"Something went when installing helm chart: returncode: {}; stdout: {}; stderr: {}".format(
res_install.returncode, res_install.stdout, res_install.stderr
)
)
self.logger.debug(
"Cleaning up helm chart " + set_values["session_id"] + "-deleter"
)
res_uninstall = self.helm.uninstall(
set_values["session_id"] + "-deleter", self.namespace
)
if res_uninstall.returncode > 0:
self.logger.error(
"Something went when uninstalling helm chart: returncode: {}; stdout: {}; stderr: {}".format(
res_uninstall.returncode, res_uninstall.stdout, res_uninstall.stderr
)
)
retJson = {
"returncode": res_install.returncode,
"stdout": res_install.stdout,
"stderr": res_install.stderr,
}
return json.dumps(retJson, ensure_ascii=True, indent=2)
def do_helm_install(self, set_values, dryrun):
self.logger.error(
'calling delete service: type=' +
set_values['deleteObject'] +
' / id=' + set_values['deleteId'] +
' / session=' + set_values['sessionId'])
"calling delete service: type="
+ set_values["deleteObject"]
+ " / id="
+ set_values["deleteId"]
+ " / session="
+ set_values["sessionId"]
)
# We need to handle the fact that we can't use an array as argument
# value when calling helm. For details see
# https://github.com/helm/helm/issues/1987#issuecomment-682352463.
job_args = ['--' + set_values['deleteObject'] + '-filter',
set_values['deleteId']]
job_args = [
"--" + set_values["deleteObject"] + "-filter",
set_values["deleteId"],
]
if dryrun:
job_args.extend(['--dry-run', set_values['sessionId']])
job_args.extend(["--dry-run", set_values["sessionId"]])
else:
job_args.append(set_values['sessionId'])
job_args = {'jobArgs': "'" + "' '".join(job_args) + "'"}
job_args.append(set_values["sessionId"])
job_args = {"jobArgs": "'" + "' '".join(job_args) + "'"}
path_to_charts = os.path.join(self.root_path, "charts", "deletion-marker")
cp = self.helm.install( # noqa: F841
chart=path_to_charts,
values_file=os.path.join(path_to_charts, "helm-values", "dd-marker-" + os.getenv("ENV", "") + ".yaml"),
name=set_values['sessionId'] + '-deleter',
return self.helm.install( # noqa: F841
chart=path_to_charts,
values_file=os.path.join(
path_to_charts,
"helm-values",
"dd-marker-" + os.getenv("ENV", "") + ".yaml",
),
name=set_values["sessionId"] + "-deleter",
namespace=self.namespace,
set_values=job_args,
fail_on_err=False
fail_on_err=False,
)
retJson = {'returncode': cp.returncode, 'stdout': cp.stdout, 'stderr': cp.stderr}
return json.dumps(retJson, ensure_ascii=True, indent=2)
......@@ -20,13 +20,25 @@ from typing import Dict, Union
import paramiko
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
from paramiko.ssh_exception import AuthenticationException, BadHostKeyException, SSHException
from paramiko.ssh_exception import (
AuthenticationException,
BadHostKeyException,
SSHException,
)
class MappingFileHandler:
def __init__(self, host: str, port: int, user: str, password: str, base_path: str,
kafka_broker_url: str, topic: str, logger):
def __init__(
self,
host: str,
port: int,
user: str,
password: str,
base_path: str,
kafka_broker_url: str,
topic: str,
logger,
):
self.host = host
self.port = port
self.user = user
......@@ -34,103 +46,104 @@ class MappingFileHandler:
self.base_path = base_path
self.logger = logger
self.config_folder = 'config'
self.mapping_file = 'mapping.yml'
self.xslt_file = 'transform.xslt'
self.local_transform_file = 'localTransforms.yml'
self.config_folder = "config"
self.mapping_file = "mapping.yml"
self.xslt_file = "transform.xslt"
self.local_transform_file = "localTransforms.yml"
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.producer = KafkaProducer(bootstrap_servers=kafka_broker_url,
key_serializer=str.encode)
self.producer = KafkaProducer(
bootstrap_servers=kafka_broker_url, key_serializer=str.encode
)
self.topic = topic
def process_mapping_files(self, record_set_id: str) -> Dict[str, Union[str, bool, list]]:
def process_mapping_files(
self, record_set_id: str
) -> Dict[str, Union[str, bool, list]]:
result = self._retrieve_mapping_files(record_set_id)
if result['status'] == 'FATAL':
result['mapping_file'] = False
result['xslt_file'] = False
result['local_transform_file'] = False
if result["status"] == "FATAL":
result["mapping_file"] = False
result["xslt_file"] = False
result["local_transform_file"] = False
return result
status = dict()
status['messages'] = list()
result_mapping = self._send_config_to_kafka(record_set_id, self.topic, 'mapping',
result['mapping_file'])
if result_mapping['status'] == 'SUCCESS':
status['mapping_file'] = True
status["messages"] = list()
result_mapping = self._send_config_to_kafka(
record_set_id, self.topic, "mapping", result["mapping_file"]
)
if result_mapping["status"] == "SUCCESS":
status["mapping_file"] = True
else:
status['mapping_file'] = False
status['status'] = 'FATAL'
status['messages'].append(result_mapping['message'])
if result['xslt_file'] != '':
result_xslt = self._send_config_to_kafka(record_set_id, self.topic, 'transform',
result['xslt_file'])
if result_xslt['status'] == 'SUCCESS':
status['xslt_file'] = True
status["mapping_file"] = False
status["status"] = "FATAL"
status["messages"].append(result_mapping["message"])
if result["xslt_file"] != "":
result_xslt = self._send_config_to_kafka(
record_set_id, self.topic, "transform", result["xslt_file"]
)
if result_xslt["status"] == "SUCCESS":
status["xslt_file"] = True
else:
status['xslt_file'] = False
status['status'] = 'WARNING'
status['messages'].append(result_xslt['message'])
status["xslt_file"] = False
status["status"] = "WARNING"
status["messages"].append(result_xslt["message"])
else:
status['xslt_file'] = False
status['status'] = 'WARNING'
if result['local_transform'] != '':
result_local_transform = self._send_config_to_kafka(record_set_id, self.topic,
'localTransform',
result['local_transform'])
if result_local_transform['status'] == 'SUCCESS':
status['local_transform_file'] = True
status["xslt_file"] = False
status["status"] = "WARNING"
if result["local_transform"] != "":
result_local_transform = self._send_config_to_kafka(
record_set_id, self.topic, "localTransform", result["local_transform"]
)
if result_local_transform["status"] == "SUCCESS":
status["local_transform_file"] = True
else:
status['local_transform_file'] = False
status['status'] = 'WARNING'
status['messages'].append(result_local_transform['message'])
status["local_transform_file"] = False
status["status"] = "WARNING"
status["messages"].append(result_local_transform["message"])
else:
status['local_transform_file'] = False
status['status'] = 'WARNING'
if 'status' not in status:
status['status'] = 'SUCCESS'
status["local_transform_file"] = False
status["status"] = "WARNING"
if "status" not in status:
status["status"] = "SUCCESS"
return status
def _retrieve_mapping_files(self, record_set_id: str) -> Dict[str, Union[str, bool, list]]:
def _retrieve_mapping_files(
self, record_set_id: str
) -> Dict[str, Union[str, bool, list]]:
try:
self.ssh_client.connect(hostname=self.host,
port=self.port,
username=self.user,
password=self.password)
self.ssh_client.connect(
hostname=self.host,
port=self.port,
username=self.user,
password=self.password,
)
except BadHostKeyException as ex:
message = f"Could not connect to the server because of a bad host key: {ex}"
self.logger.error(message)
return {
'status': 'FATAL',
'type': 'BadHostKeyException',
'messages': [message]
"status": "FATAL",
"type": "BadHostKeyException",
"messages": [message],
}
except AuthenticationException:
message = "Could not authenticate with the sftp server with the given credentials."
self.logger.error(message)
return {
'status': 'FATAL',
'type': 'AuthenticationException',
'messages': [message]
"status": "FATAL",
"type": "AuthenticationException",
"messages": [message],
}
except SSHException as ex:
message = f"SSH Exception: {ex}."
self.logger.error(message)
return {
'status': 'FATAL',
'type': 'SSHException',
'messages': [message]
}
return {"status": "FATAL", "type": "SSHException", "messages": [message]}
except socket.error as ex:
message = f"Socket Error: {ex}."
self.logger.error(message)
return {
'status': 'FATAL',
'type': 'SocketError',
'messages': [message]
}
return {"status": "FATAL", "type": "SocketError", "messages": [message]}
sftp = self.ssh_client.open_sftp()
path = join(self.base_path, record_set_id, self.config_folder)
......@@ -138,60 +151,52 @@ class MappingFileHandler:
# Check mapping file
mapping_file_path = join(path, self.mapping_file)
try:
with sftp.open(mapping_file_path, 'r') as mapping:
with sftp.open(mapping_file_path, "r") as mapping:
file_contents_mapping = mapping.read()
except IOError as err:
message = f'Could not open the mapping file at {mapping_file_path} (ERROR: {err}).'
message = f"Could not open the mapping file at {mapping_file_path} (ERROR: {err})."
sftp.close()
self.ssh_client.close()
self.logger.error(message)
return {
'status': 'FATAL',
'type': 'NoMappingFile',
'message': message
}
return {"status": "FATAL", "type": "NoMappingFile", "message": message}
# Check xslt file
xslt_file_path = join(path, self.xslt_file)
try:
with sftp.open(xslt_file_path, 'r') as xslt:
with sftp.open(xslt_file_path, "r") as xslt:
file_contents_xslt = xslt.read()
except IOError:
file_contents_xslt = ''
file_contents_xslt = ""
# Check local transform file
local_transform_file_path = join(path, self.local_transform_file)
try:
with sftp.open(local_transform_file_path, 'r') as local_transform:
with sftp.open(local_transform_file_path, "r") as local_transform:
file_contents_local_transform = local_transform.read()
except IOError:
file_contents_local_transform = ''
file_contents_local_transform = ""
sftp.close()
self.ssh_client.close()
return {
'status': 'SUCCESS',
'mapping_file': file_contents_mapping,
'xslt_file': file_contents_xslt,
'local_transform': file_contents_local_transform
"status": "SUCCESS",
"mapping_file": file_contents_mapping,
"xslt_file": file_contents_xslt,
"local_transform": file_contents_local_transform,
}
def _send_config_to_kafka(self,
record_set_id,
topic: str,
config_type: str,
content: str) -> Dict[str, Union[str, list]]:
key = f'{record_set_id}#{config_type}'
def _send_config_to_kafka(
self, record_set_id, topic: str, config_type: str, content: str
) -> Dict[str, Union[str, list]]:
key = f"{record_set_id}#{config_type}"
try:
# TODO: This could be improved to actually check if the returned future succeeds.
# However this was never a problem so far.
self.producer.send(topic, key=key, value=content)
return {
'status': 'SUCCESS'
}
return {"status": "SUCCESS"}
except KafkaTimeoutError as err:
return {
'status': 'FATAL',
'type': 'KafkaTimeOut',
'message': f'Kafka Error: {err}.'
"status": "FATAL",
"type": "KafkaTimeOut",
"message": f"Kafka Error: {err}.",
}
......@@ -21,16 +21,13 @@ from import_api_app.app import app
class ClearCache(MethodView):
def __init__(self):
self.logger = app.logger
self.headers = {
'X-API-Key': app.config['drupal-api-key']
}
user = app.config['drupal-user']
password = app.config['drupal-password']
self.headers = {"X-API-Key": app.config["drupal-api-key"]}
user = app.config["drupal-user"]
password = app.config["drupal-password"]
self.auth = HTTPBasicAuth(user, password)
self.url = app.config['drupal-api-url'] + app.config['clear-cache-url']
self.url = app.config["drupal-api-url"] + app.config["clear-cache-url"]
def get(self):
"""
......@@ -48,20 +45,24 @@ class ClearCache(MethodView):
try:
response = requests.get(self.url, headers=self.headers, auth=self.auth)
except Exception as ex:
msg = 'Exception while calling ' + self.url + ': ' + str(ex)
msg = "Exception while calling " + self.url + ": " + str(ex)
self.logger.error(msg)
return {
'response': msg,
}, 500
"response": msg,
}, 500
if response.ok:
self.logger.debug('successfully called ' + self.url)
self.logger.debug("successfully called " + self.url)
return {
'content': response.content.decode("utf-8"),
}, response.status_code
"content": response.content.decode("utf-8"),
}, response.status_code
else:
self.logger.error('Clearing Cache Failed')
self.logger.error('statuscode: ' + str(response.status_code) + ' content: ' +
response.content.decode('utf-8'))
self.logger.error("Clearing Cache Failed")
self.logger.error(
"statuscode: "
+ str(response.status_code)
+ " content: "
+ response.content.decode("utf-8")
)
return {
'content': response.content.decode("utf-8"),
}, response.status_code
"content": response.content.decode("utf-8"),
}, response.status_code
......@@ -21,9 +21,10 @@ from import_api_app.helpers.delete_service import DeleteService
class DeleteInstitution(MethodView):
def __init__(self):
self.delete_service = DeleteService(app.logger, app.config['NAMESPACE'], app.root_path)
self.delete_service = DeleteService(
app.logger, app.config["NAMESPACE"], app.root_path
)
def post(self, session_id, dryrun=False):
"""
......@@ -65,5 +66,7 @@ class DeleteInstitution(MethodView):
enum: ['SUCCESS', 'FAILURE']
"""
institution_id = request.json['institution_id']
return self.delete_service.delete_institution(institution_id, session_id, dryrun)
institution_id = request.json["institution_id"]
return self.delete_service.delete_institution(
institution_id, session_id, dryrun
)
......@@ -21,9 +21,10 @@ from import_api_app.helpers.delete_service import DeleteService
class DeleteRecord(MethodView):
def __init__(self):
self.delete_service = DeleteService(app.logger, app.config['NAMESPACE'], app.root_path)
self.delete_service = DeleteService(
app.logger, app.config["NAMESPACE"], app.root_path
)
def post(self, session_id, dryrun=False):
"""
......@@ -65,5 +66,5 @@ class DeleteRecord(MethodView):
enum: ['SUCCESS', 'FAILURE']
"""
record_id = request.json['record_id']
record_id = request.json["record_id"]
return self.delete_service.delete_record(record_id, session_id, dryrun)
......@@ -21,9 +21,10 @@ from import_api_app.helpers.delete_service import DeleteService
class DeleteRecordset(MethodView):
def __init__(self):
self.delete_service = DeleteService(app.logger, app.config['NAMESPACE'], app.root_path)
self.delete_service = DeleteService(
app.logger, app.config["NAMESPACE"], app.root_path
)
def post(self, session_id, dryrun=False):
"""
......@@ -65,5 +66,5 @@ class DeleteRecordset(MethodView):
enum: ['SUCCESS', 'FAILURE']
"""
recordset_id = request.json['recordset_id']
recordset_id = request.json["recordset_id"]
return self.delete_service.delete_recordset(recordset_id, session_id, dryrun)
......@@ -20,15 +20,14 @@ from import_api_app.helpers.mapping import MappingFileHandler
class FetchMappingFile(MethodView):
def __init__(self):
self.host = app.config["sftp_host"]
self.port = app.config["sftp_port"]
self.user = app.config["sftp_user"]
self.password = app.config["sftp_password"]
self.base_path = app.config['sftp_base_path']
self.topic = app.config['topic-configs']
self.kafka_broker_url = app.config['kafka-broker-url']
self.base_path = app.config["sftp_base_path"]
self.topic = app.config["topic-configs"]
self.kafka_broker_url = app.config["kafka-broker-url"]
self.mapping_file_handler = MappingFileHandler(
self.host,
......@@ -38,7 +37,7 @@ class FetchMappingFile(MethodView):
self.base_path,
self.kafka_broker_url,
self.topic,
app.logger
app.logger,
)
def get(self, recordset_id, session_id):
......@@ -75,7 +74,7 @@ class FetchMappingFile(MethodView):
"""
result = self.mapping_file_handler.process_mapping_files(recordset_id)