Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

Commit c59b1b63 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Refactor mapping file handler.

This should help with future testing and makes it easier to reuse the functionality in two places. Cleaned up return messages. Will do more.
parent 2755c134
Pipeline #33629 passed with stages
in 2 minutes and 27 seconds
import socket
from os.path import join
from typing import Dict, Union
import paramiko
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
from paramiko.ssh_exception import AuthenticationException, BadHostKeyException, SSHException
class MappingFileHandler:
def __init__(self, host: str, port: int, user: str, password: str, base_path: str,
kafka_broker_url: str, topic: str, logger):
self.host = host
self.port = port
self.user = user
self.password = password
self.base_path = base_path
self.logger = logger
self.config_folder = 'config'
self.mapping_file = 'mapping.yml'
self.xslt_file = 'transform.xslt'
self.local_transform_file = 'localTransforms.yml'
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.producer = KafkaProducer(bootstrap_servers=kafka_broker_url,
key_serializer=str.encode)
self.topic = topic
def process_mapping_files(self, record_set_id: str) -> Dict[str, Union[str, bool, list]]:
result = self._retrieve_mapping_files(record_set_id)
if result['status'] == 'FATAL':
result['mapping_file'] = False
result['xslt_file'] = False
result['local_transform_file'] = False
return result
status = dict()
status['messages'] = list()
result_mapping = self._send_config_to_kafka(record_set_id, self.topic, 'mapping',
result['mapping_file'])
if result_mapping['status'] == 'SUCCESS':
status['mapping_file'] = True
else:
status['mapping_file'] = False
status['messages'].append(result_mapping['message'])
if result['xslt_file'] != '':
result_xslt = self._send_config_to_kafka(record_set_id, self.topic, 'transform',
result['xslt_file'])
if result_xslt['status'] == 'SUCCESS':
status['xslt_file'] = True
else:
status['xslt_file'] = False
status['messages'].append(result_xslt['message'])
else:
status['xslt_file'] = False
if result['local_transform'] != '':
result_local_transform = self._send_config_to_kafka(record_set_id, self.topic,
'localTransform',
result['local_transform'])
if result_local_transform['status'] == 'SUCCESS':
status['local_transform_file'] = True
else:
status['local_transform_file'] = False
status['messages'].append(result_local_transform['message'])
else:
status['local_transform_file'] = False
return status
def _retrieve_mapping_files(self, record_set_id: str) -> Dict[str, Union[str, bool, list]]:
try:
with self.ssh_client.connect(hostname=self.host,
port=self.port,
username=self.user,
password=self.password) as client:
with client.open_sftp() as sftp:
path = join(self.base_path, record_set_id, self.config_folder)
# Check mapping file
try:
mapping_file_path = join(path, self.mapping_file)
with sftp.open(mapping_file_path, 'r') as mapping:
file_contents_mapping = mapping.read()
except IOError as err:
return {
'status': 'FATAL',
'type': 'NoMappingFile',
'message': f'Could not open the mapping file at '
f'{mapping_file_path} (ERROR: {err}).'
}
# Check xslt file
try:
xslt_file_path = join(path, self.xslt_file)
sftp.stat(xslt_file_path)
with sftp.open(xslt_file_path, 'r') as xslt:
file_contents_xslt = xslt.read()
except IOError:
file_contents_xslt = ''
# Check local transform file
try:
local_transform_file_path = join(path, self.local_transform_file)
sftp.stat(local_transform_file_path)
with sftp.open(local_transform_file_path, 'r') as local_transform:
file_contents_local_transform = local_transform.read()
except IOError:
file_contents_local_transform = ''
except BadHostKeyException as ex:
message = f"Could not connect to the server because of a bad host key: {ex}"
self.logger.error()
return {
'status': 'FATAL',
'type': 'BadHostKeyException',
'messages': [message]
}
except AuthenticationException:
message = "Could not authenticate with the sftp server with the given credentials."
self.logger.error()
return {
'status': 'FATAL',
'type': 'AuthenticationException',
'messages': [message]
}
except SSHException as ex:
message = f"SSH Exception: {ex}."
self.logger.error()
return {
'status': 'FATAL',
'type': 'SSHException',
'messages': [message]
}
except socket.error as ex:
message = f"Socket Error: {ex}."
self.logger.error(message)
return {
'status': 'FATAL',
'type': 'SocketError',
'messages': [message]
}
return {
'status': 'SUCCESS',
'mapping_file': file_contents_mapping,
'xslt_file': file_contents_xslt,
'local_transform': file_contents_local_transform
}
def _send_config_to_kafka(self,
record_set_id,
topic: str,
config_type: str,
content: str) -> Dict[str, Union[str, list]]:
key = f'{record_set_id}#{config_type}'
try:
# TODO: This could be improved to actually check if the returned future succeeds.
# However this was never a problem so far.
self.producer.send(topic, key=key, value=content)
return {
'status': 'SUCCESS'
}
except KafkaTimeoutError as err:
return {
'status': 'FATAL',
'type': 'KafkaTimeOut',
'message': f'Kafka Error: {err}.'
}
from flask_restful import Resource, current_app
from kafka import KafkaProducer
from import_api_app.helpers.error import ImportApiError
import paramiko
from import_api_app.helpers.mapping import MappingFileHandler
class FetchMappingFile(Resource):
def __init__(self):
self.host = current_app.config["sftp_host"]
self.port = current_app.config["sftp_port"]
self.user = current_app.config["sftp_user"]
self.password = current_app.config["sftp_password"]
self.base_path = current_app.config['sftp_base_path']
self.topic = current_app.config['config_topic']
self.kafka_broker_url = current_app.config['kafka-broker-url']
self.mapping_file_handler = MappingFileHandler(
self.host,
self.port,
self.user,
self.password,
self.base_path,
self.kafka_broker_url,
self.topic,
current_app.logger
)
def get(self, recordset_id, session_id):
"""
Fetches the mapping file form the sftp-server
......@@ -40,69 +58,8 @@ class FetchMappingFile(Resource):
example: the contents of the mapping file...
"""
return self.fetchMappingFile(recordset_id, session_id)
def fetchMappingFile(self, recordset_id, session_id):
# 1. read file contents from sftp:
host = current_app.config["sftp_host"]
port = current_app.config["sftp_port"]
user = current_app.config["sftp_user"]
pwd = current_app.config["sftp_password"]
fileContentsMapping = ''
fileContentsTransform = ''
fileContentsLocalTransform = ''
try:
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=host,
port=port,
username=user,
password=pwd)
sftp_client = ssh_client.open_sftp()
for fileName in ['mapping.yml', 'transform.xslt', 'localTransforms.yml']:
envDir = '/'
if current_app.config['env'] == 'stage':
envDir = '/stage/'
elif current_app.config['env'] == 'test':
envDir = '/test/'
remote_path = '/swissbib_index/mb_sftp' + envDir + recordset_id + \
'/config/' + fileName
try:
sftp_client.stat(remote_path)
remote_file = sftp_client.file(remote_path, 'r')
if fileName == 'mapping.yml':
fileContentsMapping = remote_file.read()
if fileName == 'transform.xslt':
fileContentsTransform = remote_file.read()
if fileName == 'localTransforms.yml':
fileContentsLocalTransform = remote_file.read()
except IOError:
message = 'remote path "' + remote_path + '" does not exist'
current_app.logger.debug(message)
sftp_client.close()
ssh_client.close()
except Exception as ex:
message = str(ex)
current_app.logger.debug(message)
raise ImportApiError(message)
# 2. write file content into kafka topic
topic = current_app.config['topic-configs']
try:
producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
key_serializer=str.encode)
key = recordset_id + '#'
producer.send(topic, key=key + 'mapping', value=fileContentsMapping)
if "" != fileContentsTransform:
producer.send(topic, key=key + 'transform', value=fileContentsTransform)
if "" != fileContentsLocalTransform:
producer.send(topic, key=key + 'localTransform', value=fileContentsLocalTransform)
except Exception as ex:
message = str(ex)
current_app.logger.debug(message)
raise ImportApiError(message)
return {'status': 'SUCCESS'}, 200
result = self.mapping_file_handler.process_mapping_files(recordset_id)
if result['status'] == 'SUCCESS':
return result, 200
else:
return result, 500
......@@ -5,12 +5,31 @@ from flasgger import swag_from
from flask_restful import Resource, reqparse, current_app
from import_api_app.helm import Helm
from import_api_app.resources.fetch_mapping_file import FetchMappingFile
from import_api_app.helpers.mapping import MappingFileHandler
class ImportProcessStart(Resource):
# Todo validate requests
# @swag.validate('job-parameters')
def __init__(self):
self.host = current_app.config["sftp_host"]
self.port = current_app.config["sftp_port"]
self.user = current_app.config["sftp_user"]
self.password = current_app.config["sftp_password"]
self.base_path = current_app.config['sftp_base_path']
self.topic = current_app.config['config_topic']
self.kafka_broker_url = current_app.config['kafka-broker-url']
self.mapping_file_handler = MappingFileHandler(
self.host,
self.port,
self.user,
self.password,
self.base_path,
self.kafka_broker_url,
self.topic,
current_app.logger
)
@swag_from('ImportProcessStart.yml')
def post(self, institution_id, record_set_id):
# get parameters of request-body
......@@ -34,12 +53,12 @@ class ImportProcessStart(Resource):
if 'xmlIdentifierFieldName' not in job_parameters:
job_parameters['xmlIdentifierFieldName'] = 'id'
if 'isPublished' not in job_parameters:
job_parameters['isPublished'] = 'defaultvalue...'
job_parameters['isPublished'] = 'false'
# start fetch-mapping-file
FetchMappingFile.fetchMappingFile(
self, recordset_id=record_set_id, session_id=job_parameters['sessionId']
)
# send mapping files to result:
mapping_file_result = self.mapping_file_handler.process_mapping_files(record_set_id)
if mapping_file_result['status'] == 'FATAL':
return mapping_file_result, 500
# start text-file-validation
short_session_id = hashlib.sha1(job_parameters['sessionId'].encode("UTF-8")).hexdigest()[
......@@ -82,4 +101,4 @@ class ImportProcessStart(Resource):
if response.stderr == '':
return {'status': 'SUCCESS', 'message': response.stdout}, 200
else:
return {'status': 'FAILED', 'message': response.stderr}, 500
return {'status': 'FATAL', 'message': response.stderr}, 500
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment