# Import API Service # Copyright (C) 2020-2021 Memobase Project # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import socket from os.path import join from typing import Dict, Union import paramiko from kafka import KafkaProducer from kafka.errors import KafkaTimeoutError from paramiko.ssh_exception import ( AuthenticationException, BadHostKeyException, SSHException, ) class MappingFileHandler: def __init__( self, host: str, port: int, user: str, password: str, base_path: str, kafka_broker_url: str, topic: str, logger, ): self.host = host self.port = port self.user = user self.password = password self.base_path = base_path self.logger = logger self.config_folder = "config" self.mapping_file = "mapping.yml" self.xslt_file = "transform.xslt" self.local_transform_file = "localTransforms.yml" self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.producer = KafkaProducer( bootstrap_servers=kafka_broker_url, key_serializer=str.encode ) self.topic = topic def process_mapping_files( self, record_set_id: str ) -> Dict[str, Union[str, bool, list]]: result = self._retrieve_mapping_files(record_set_id) if result["status"] == "FATAL": result["mapping_file"] = False result["xslt_file"] = False result["local_transform_file"] = False return result status = dict() status["messages"] = list() result_mapping = self._send_config_to_kafka( record_set_id, self.topic, "mapping", result["mapping_file"] ) if result_mapping["status"] == "SUCCESS": status["mapping_file"] = True else: status["mapping_file"] = False status["status"] = "FATAL" status["messages"].append(result_mapping["message"]) if result["xslt_file"] != "": result_xslt = self._send_config_to_kafka( record_set_id, self.topic, "transform", result["xslt_file"] ) if result_xslt["status"] == "SUCCESS": status["xslt_file"] = True else: status["xslt_file"] = False status["status"] = "WARNING" status["messages"].append(result_xslt["message"]) else: status["xslt_file"] = False status["status"] = "WARNING" if result["local_transform"] != "": result_local_transform = self._send_config_to_kafka( record_set_id, self.topic, "localTransform", result["local_transform"] ) if result_local_transform["status"] == "SUCCESS": status["local_transform_file"] = True else: status["local_transform_file"] = False status["status"] = "WARNING" status["messages"].append(result_local_transform["message"]) else: status["local_transform_file"] = False status["status"] = "WARNING" if "status" not in status: status["status"] = "SUCCESS" return status def _retrieve_mapping_files( self, record_set_id: str ) -> Dict[str, Union[str, bool, list]]: try: self.ssh_client.connect( hostname=self.host, port=self.port, username=self.user, password=self.password, ) except BadHostKeyException as ex: message = f"Could not connect to the server because of a bad host key: {ex}" self.logger.error(message) return { "status": "FATAL", "type": "BadHostKeyException", "messages": [message], } except AuthenticationException: message = "Could not authenticate with the sftp server with the given credentials." self.logger.error(message) return { "status": "FATAL", "type": "AuthenticationException", "messages": [message], } except SSHException as ex: message = f"SSH Exception: {ex}." self.logger.error(message) return {"status": "FATAL", "type": "SSHException", "messages": [message]} except socket.error as ex: message = f"Socket Error: {ex}." self.logger.error(message) return {"status": "FATAL", "type": "SocketError", "messages": [message]} sftp = self.ssh_client.open_sftp() path = join(self.base_path, record_set_id, self.config_folder) # Check mapping file mapping_file_path = join(path, self.mapping_file) try: with sftp.open(mapping_file_path, "r") as mapping: file_contents_mapping = mapping.read() except IOError as err: message = f"Could not open the mapping file at {mapping_file_path} (ERROR: {err})." sftp.close() self.ssh_client.close() self.logger.error(message) return {"status": "FATAL", "type": "NoMappingFile", "message": message} # Check xslt file xslt_file_path = join(path, self.xslt_file) try: with sftp.open(xslt_file_path, "r") as xslt: file_contents_xslt = xslt.read() except IOError: file_contents_xslt = "" # Check local transform file local_transform_file_path = join(path, self.local_transform_file) try: with sftp.open(local_transform_file_path, "r") as local_transform: file_contents_local_transform = local_transform.read() except IOError: file_contents_local_transform = "" sftp.close() self.ssh_client.close() return { "status": "SUCCESS", "mapping_file": file_contents_mapping, "xslt_file": file_contents_xslt, "local_transform": file_contents_local_transform, } def _send_config_to_kafka( self, record_set_id, topic: str, config_type: str, content: str ) -> Dict[str, Union[str, list]]: key = f"{record_set_id}#{config_type}" try: # TODO: This could be improved to actually check if the returned future succeeds. # However this was never a problem so far. self.producer.send(topic, key=key, value=content) return {"status": "SUCCESS"} except KafkaTimeoutError as err: return { "status": "FATAL", "type": "KafkaTimeOut", "message": f"Kafka Error: {err}.", }