Commit 1a7cb13f authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Merge branch 'master' of gitlab.switch.ch:memoriav/memobase-2020/services/import-process/import-api

parents 0c057c28 b8b9cc93
Pipeline #37956 passed with stages
in 1 minute and 25 seconds
...@@ -4,24 +4,24 @@ accesslog = "/dev/stdout" ...@@ -4,24 +4,24 @@ accesslog = "/dev/stdout"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
disable_redirect_access_to_syslog = True disable_redirect_access_to_syslog = True
errorlog = "/dev/stderr" errorlog = "/dev/stderr"
loglevel = 'info' loglevel = "info"
workers = 1 workers = 1
logconfig_dict = dict( logconfig_dict = dict(
version=1, version=1,
disable_existing_loggers=True, disable_existing_loggers=True,
loggers={ loggers={
"gunicorn.error": { "gunicorn.error": {
"level": "INFO", "level": "INFO",
"handlers": ["error_console"], "handlers": ["error_console"],
"propagate": False, "propagate": False,
"qualname": "gunicorn.error" "qualname": "gunicorn.error",
}
},
formatters={
"generic": {
"format": "[%(levelname)s] [%(name)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
"class": "logging.Formatter"
}
} }
},
formatters={
"generic": {
"format": "[%(levelname)s] [%(name)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
"class": "logging.Formatter",
}
},
) )
...@@ -4,24 +4,24 @@ accesslog = "/dev/stdout" ...@@ -4,24 +4,24 @@ accesslog = "/dev/stdout"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
disable_redirect_access_to_syslog = True disable_redirect_access_to_syslog = True
errorlog = "/dev/stderr" errorlog = "/dev/stderr"
loglevel = 'debug' loglevel = "debug"
workers = 1 workers = 1
logconfig_dict = dict( logconfig_dict = dict(
version=1, version=1,
disable_existing_loggers=True, disable_existing_loggers=True,
loggers={ loggers={
"gunicorn.error": { "gunicorn.error": {
"level": "DEBUG", "level": "DEBUG",
"handlers": ["error_console"], "handlers": ["error_console"],
"propagate": False, "propagate": False,
"qualname": "gunicorn.error" "qualname": "gunicorn.error",
}
},
formatters={
"generic": {
"format": "[%(levelname)s] [%(name)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
"class": "logging.Formatter"
}
} }
},
formatters={
"generic": {
"format": "[%(levelname)s] [%(name)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]",
"class": "logging.Formatter",
}
},
) )
...@@ -27,7 +27,7 @@ class Helm: ...@@ -27,7 +27,7 @@ class Helm:
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
check=fail_on_err, check=fail_on_err,
text=True text=True,
) )
def install( def install(
......
...@@ -20,7 +20,6 @@ from import_api_app.helpers.error import ImportApiError ...@@ -20,7 +20,6 @@ from import_api_app.helpers.error import ImportApiError
class DeleteService: class DeleteService:
def __init__(self, logger, namespace, root_path): def __init__(self, logger, namespace, root_path):
self.namespace = namespace self.namespace = namespace
self.root_path = root_path self.root_path = root_path
...@@ -29,12 +28,12 @@ class DeleteService: ...@@ -29,12 +28,12 @@ class DeleteService:
def delete_record(self, record_id, session_id, dryrun): def delete_record(self, record_id, session_id, dryrun):
set_values = { set_values = {
'sessionId': session_id, "sessionId": session_id,
'deleteObject': 'record', "deleteObject": "record",
'deleteId': record_id "deleteId": record_id,
} }
try: try:
return self.do_helm_install(set_values, dryrun) return self.run_job(set_values, dryrun == "true")
except Exception as ex: except Exception as ex:
message = str(ex) message = str(ex)
self.logger.error(message) self.logger.error(message)
...@@ -42,12 +41,12 @@ class DeleteService: ...@@ -42,12 +41,12 @@ class DeleteService:
def delete_recordset(self, recordset_id, session_id, dryrun): def delete_recordset(self, recordset_id, session_id, dryrun):
set_values = { set_values = {
'sessionId': session_id, "sessionId": session_id,
'deleteObject': 'record-set', "deleteObject": "record-set",
'deleteId': recordset_id "deleteId": recordset_id,
} }
try: try:
return self.do_helm_install(set_values, dryrun) return self.run_job(set_values, dryrun == "true")
except Exception as ex: except Exception as ex:
message = str(ex) message = str(ex)
self.logger.error(message) self.logger.error(message)
...@@ -55,37 +54,72 @@ class DeleteService: ...@@ -55,37 +54,72 @@ class DeleteService:
def delete_institution(self, institution_id, session_id, dryrun): def delete_institution(self, institution_id, session_id, dryrun):
set_values = { set_values = {
'sessionId': session_id, "sessionId": session_id,
'deleteObject': 'institution', "deleteObject": "institution",
'deleteId': institution_id, "deleteId": institution_id,
} }
try: try:
return self.do_helm_install(set_values, dryrun) return self.run_job(set_values, dryrun == "true")
except Exception as ex: except Exception as ex:
message = str(ex) message = str(ex)
self.logger.error(message) self.logger.error(message)
raise ImportApiError(message) raise ImportApiError(message)
def do_helm_install(self, set_values, dryrun): def run_job(self, set_values: dict, dryrun: bool) -> str:
chart_name = "dd-marker-{}-{}".format(
os.getenv("ENV", ""), set_values["sessionId"]
)
res_install = self.do_helm_install(set_values, chart_name, dryrun)
if res_install.returncode > 0:
self.logger.error(
"Something went when installing helm chart {}: returncode: {}; stdout: {}; stderr: {}".format(
chart_name,
res_install.returncode,
res_install.stdout,
res_install.stderr,
)
)
retJson = {
"returncode": res_install.returncode,
"stdout": res_install.stdout,
"stderr": res_install.stderr,
}
return json.dumps(retJson, ensure_ascii=True, indent=2)
def do_helm_install(self, set_values: dict, chart_name: str, dryrun: bool):
self.logger.error( self.logger.error(
'calling delete service: type=' + "calling delete service: type="
set_values['deleteObject'] + + set_values["deleteObject"]
' / id=' + set_values['deleteId'] + + " / id="
' / session=' + set_values['sessionId']) + set_values["deleteId"]
+ " / session="
+ set_values["sessionId"]
)
# Due to limitations on arrays in helm argument values, we need to build a string
# (which will afterwards be converted back to an array by the k8s template)
job_args = [ job_args = [
'--' + set_values['deleteObject'] + '-filter', "--" + set_values["deleteObject"] + "-filter",
set_values['deleteId'], set_values["deleteId"],
set_values['sessionId']
] ]
if dryrun: if dryrun:
job_args.append('--dry-run') job_args.extend(["--dry-run", set_values["sessionId"]])
job_args = {'jobArgs': job_args} else:
cp = self.helm.install( # noqa: F841 job_args.append(set_values["sessionId"])
chart=os.path.join(self.root_path, "charts", 'dd-marker-prod'), # The sessionId is additionally used to build a distinct job id
name=set_values['sessionId'] + '-deleter', job_args = {
"jobArgs": "'" + "' '".join(job_args) + "'",
"sessionId": set_values["sessionId"],
}
path_to_charts = os.path.join(self.root_path, "charts", "deletion-marker")
return self.helm.install( # noqa: F841
chart=path_to_charts,
values_file=os.path.join(
path_to_charts,
"helm-values",
"dd-marker-" + os.getenv("ENV", "") + ".yaml",
),
name=chart_name,
namespace=self.namespace, namespace=self.namespace,
set_values=job_args, set_values=job_args,
fail_on_err=False fail_on_err=False,
) )
retJson = {'returncode': cp.returncode, 'stdout': cp.stdout, 'stderr': cp.stderr}
return json.dumps(retJson, ensure_ascii=True, indent=2)
...@@ -20,13 +20,25 @@ from typing import Dict, Union ...@@ -20,13 +20,25 @@ from typing import Dict, Union
import paramiko import paramiko
from kafka import KafkaProducer from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError from kafka.errors import KafkaTimeoutError
from paramiko.ssh_exception import AuthenticationException, BadHostKeyException, SSHException from paramiko.ssh_exception import (
AuthenticationException,
BadHostKeyException,
SSHException,
)
class MappingFileHandler: class MappingFileHandler:
def __init__(
def __init__(self, host: str, port: int, user: str, password: str, base_path: str, self,
kafka_broker_url: str, topic: str, logger): host: str,
port: int,
user: str,
password: str,
base_path: str,
kafka_broker_url: str,
topic: str,
logger,
):
self.host = host self.host = host
self.port = port self.port = port
self.user = user self.user = user
...@@ -34,103 +46,104 @@ class MappingFileHandler: ...@@ -34,103 +46,104 @@ class MappingFileHandler:
self.base_path = base_path self.base_path = base_path
self.logger = logger self.logger = logger
self.config_folder = 'config' self.config_folder = "config"
self.mapping_file = 'mapping.yml' self.mapping_file = "mapping.yml"
self.xslt_file = 'transform.xslt' self.xslt_file = "transform.xslt"
self.local_transform_file = 'localTransforms.yml' self.local_transform_file = "localTransforms.yml"
self.ssh_client = paramiko.SSHClient() self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.producer = KafkaProducer(bootstrap_servers=kafka_broker_url, self.producer = KafkaProducer(
key_serializer=str.encode) bootstrap_servers=kafka_broker_url, key_serializer=str.encode
)
self.topic = topic self.topic = topic
def process_mapping_files(self, record_set_id: str) -> Dict[str, Union[str, bool, list]]: def process_mapping_files(
self, record_set_id: str
) -> Dict[str, Union[str, bool, list]]:
result = self._retrieve_mapping_files(record_set_id) result = self._retrieve_mapping_files(record_set_id)
if result['status'] == 'FATAL': if result["status"] == "FATAL":
result['mapping_file'] = False result["mapping_file"] = False
result['xslt_file'] = False result["xslt_file"] = False
result['local_transform_file'] = False result["local_transform_file"] = False
return result return result
status = dict() status = dict()
status['messages'] = list() status["messages"] = list()
result_mapping = self._send_config_to_kafka(record_set_id, self.topic, 'mapping', result_mapping = self._send_config_to_kafka(
result['mapping_file']) record_set_id, self.topic, "mapping", result["mapping_file"]
if result_mapping['status'] == 'SUCCESS': )
status['mapping_file'] = True if result_mapping["status"] == "SUCCESS":
status["mapping_file"] = True
else: else:
status['mapping_file'] = False status["mapping_file"] = False
status['status'] = 'FATAL' status["status"] = "FATAL"
status['messages'].append(result_mapping['message']) status["messages"].append(result_mapping["message"])
if result['xslt_file'] != '': if result["xslt_file"] != "":
result_xslt = self._send_config_to_kafka(record_set_id, self.topic, 'transform', result_xslt = self._send_config_to_kafka(
result['xslt_file']) record_set_id, self.topic, "transform", result["xslt_file"]
if result_xslt['status'] == 'SUCCESS': )
status['xslt_file'] = True if result_xslt["status"] == "SUCCESS":
status["xslt_file"] = True
else: else:
status['xslt_file'] = False status["xslt_file"] = False
status['status'] = 'WARNING' status["status"] = "WARNING"
status['messages'].append(result_xslt['message']) status["messages"].append(result_xslt["message"])
else: else:
status['xslt_file'] = False status["xslt_file"] = False
status['status'] = 'WARNING' status["status"] = "WARNING"
if result['local_transform'] != '': if result["local_transform"] != "":
result_local_transform = self._send_config_to_kafka(record_set_id, self.topic, result_local_transform = self._send_config_to_kafka(
'localTransform', record_set_id, self.topic, "localTransform", result["local_transform"]
result['local_transform']) )
if result_local_transform['status'] == 'SUCCESS': if result_local_transform["status"] == "SUCCESS":
status['local_transform_file'] = True status["local_transform_file"] = True
else: else:
status['local_transform_file'] = False status["local_transform_file"] = False
status['status'] = 'WARNING' status["status"] = "WARNING"
status['messages'].append(result_local_transform['message']) status["messages"].append(result_local_transform["message"])
else: else:
status['local_transform_file'] = False status["local_transform_file"] = False
status['status'] = 'WARNING' status["status"] = "WARNING"
if 'status' not in status: if "status" not in status:
status['status'] = 'SUCCESS' status["status"] = "SUCCESS"
return status return status
def _retrieve_mapping_files(self, record_set_id: str) -> Dict[str, Union[str, bool, list]]: def _retrieve_mapping_files(
self, record_set_id: str
) -> Dict[str, Union[str, bool, list]]:
try: try:
self.ssh_client.connect(hostname=self.host, self.ssh_client.connect(
port=self.port, hostname=self.host,
username=self.user, port=self.port,
password=self.password) username=self.user,
password=self.password,
)
except BadHostKeyException as ex: except BadHostKeyException as ex:
message = f"Could not connect to the server because of a bad host key: {ex}" message = f"Could not connect to the server because of a bad host key: {ex}"
self.logger.error(message) self.logger.error(message)
return { return {
'status': 'FATAL', "status": "FATAL",
'type': 'BadHostKeyException', "type": "BadHostKeyException",
'messages': [message] "messages": [message],
} }
except AuthenticationException: except AuthenticationException:
message = "Could not authenticate with the sftp server with the given credentials." message = "Could not authenticate with the sftp server with the given credentials."
self.logger.error(message) self.logger.error(message)
return { return {
'status': 'FATAL', "status": "FATAL",
'type': 'AuthenticationException', "type": "AuthenticationException",
'messages': [message] "messages": [message],
} }
except SSHException as ex: except SSHException as ex:
message = f"SSH Exception: {ex}." message = f"SSH Exception: {ex}."
self.logger.error(message) self.logger.error(message)
return { return {"status": "FATAL", "type": "SSHException", "messages": [message]}
'status': 'FATAL',
'type': 'SSHException',
'messages': [message]
}
except socket.error as ex: except socket.error as ex:
message = f"Socket Error: {ex}." message = f"Socket Error: {ex}."
self.logger.error(message) self.logger.error(message)
return { return {"status": "FATAL", "type": "SocketError", "messages": [message]}
'status': 'FATAL',
'type': 'SocketError',
'messages': [message]
}
sftp = self.ssh_client.open_sftp() sftp = self.ssh_client.open_sftp()
path = join(self.base_path, record_set_id, self.config_folder) path = join(self.base_path, record_set_id, self.config_folder)
...@@ -138,60 +151,52 @@ class MappingFileHandler: ...@@ -138,60 +151,52 @@ class MappingFileHandler:
# Check mapping file # Check mapping file
mapping_file_path = join(path, self.mapping_file) mapping_file_path = join(path, self.mapping_file)
try: try:
with sftp.open(mapping_file_path, 'r') as mapping: with sftp.open(mapping_file_path, "r") as mapping:
file_contents_mapping = mapping.read() file_contents_mapping = mapping.read()
except IOError as err: except IOError as err:
message = f'Could not open the mapping file at {mapping_file_path} (ERROR: {err}).' message = f"Could not open the mapping file at {mapping_file_path} (ERROR: {err})."
sftp.close() sftp.close()
self.ssh_client.close() self.ssh_client.close()
self.logger.error(message) self.logger.error(message)
return { return {"status": "FATAL", "type": "NoMappingFile", "message": message}
'status': 'FATAL',
'type': 'NoMappingFile',
'message': message
}
# Check xslt file # Check xslt file
xslt_file_path = join(path, self.xslt_file) xslt_file_path = join(path, self.xslt_file)
try: try:
with sftp.open(xslt_file_path, 'r') as xslt: with sftp.open(xslt_file_path, "r") as xslt:
file_contents_xslt = xslt.read() file_contents_xslt = xslt.read()
except IOError: except IOError:
file_contents_xslt = '' file_contents_xslt = ""
# Check local transform file # Check local transform file
local_transform_file_path = join(path, self.local_transform_file) local_transform_file_path = join(path, self.local_transform_file)
try: try:
with sftp.open(local_transform_file_path, 'r') as local_transform: with sftp.open(local_transform_file_path, "r") as local_transform:
file_contents_local_transform = local_transform.read() file_contents_local_transform = local_transform.read()
except IOError: except IOError:
file_contents_local_transform = '' file_contents_local_transform = ""
sftp.close() sftp.close()
self.ssh_client.close() self.ssh_client.close()
return { return {
'status': 'SUCCESS', "status": "SUCCESS",
'mapping_file': file_contents_mapping, "mapping_file": file_contents_mapping,
'xslt_file': file_contents_xslt, "xslt_file": file_contents_xslt,
'local_transform': file_contents_local_transform "local_transform": file_contents_local_transform,
} }
def _send_config_to_kafka(self, def _send_config_to_kafka(
record_set_id, self, record_set_id, topic: str, config_type: str, content: str
topic: str, ) -> Dict[str, Union[str, list]]:
config_type: str, key = f"{record_set_id}#{config_type}"
content: str) -> Dict[str, Union[str, list]]:
key = f'{record_set_id}#{config_type}'
try: try:
# TODO: This could be improved to actually check if the returned future succeeds. # TODO: This could be improved to actually check if the returned future succeeds.
# However this was never a problem so far. # However this was never a problem so far.
self.producer.send(topic, key=key, value=content) self.producer.send(topic, key=key, value=content)