mapping.py 7.34 KB
Newer Older
Jonas Waeber's avatar
Jonas Waeber committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Import API Service
# Copyright (C) 2020-2021 Memobase Project
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
Jonas Waeber's avatar
Jonas Waeber committed
16
17
18
19
20
21
22
import socket
from os.path import join
from typing import Dict, Union

import paramiko
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
23
24
25
26
27
from paramiko.ssh_exception import (
    AuthenticationException,
    BadHostKeyException,
    SSHException,
)
Jonas Waeber's avatar
Jonas Waeber committed
28
29
30


class MappingFileHandler:
31
32
33
34
35
36
37
38
39
40
41
    def __init__(
        self,
        host: str,
        port: int,
        user: str,
        password: str,
        base_path: str,
        kafka_broker_url: str,
        topic: str,
        logger,
    ):
Jonas Waeber's avatar
Jonas Waeber committed
42
43
44
45
46
47
48
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.base_path = base_path
        self.logger = logger

49
50
51
52
        self.config_folder = "config"
        self.mapping_file = "mapping.yml"
        self.xslt_file = "transform.xslt"
        self.local_transform_file = "localTransforms.yml"
Jonas Waeber's avatar
Jonas Waeber committed
53
54
55
        self.ssh_client = paramiko.SSHClient()
        self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

56
57
58
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_broker_url, key_serializer=str.encode
        )
Jonas Waeber's avatar
Jonas Waeber committed
59
60
        self.topic = topic

61
62
63
    def process_mapping_files(
        self, record_set_id: str
    ) -> Dict[str, Union[str, bool, list]]:
Jonas Waeber's avatar
Jonas Waeber committed
64
65
        result = self._retrieve_mapping_files(record_set_id)

66
67
68
69
        if result["status"] == "FATAL":
            result["mapping_file"] = False
            result["xslt_file"] = False
            result["local_transform_file"] = False
Jonas Waeber's avatar
Jonas Waeber committed
70
71
72
            return result

        status = dict()
73
74
75
76
77
78
        status["messages"] = list()
        result_mapping = self._send_config_to_kafka(
            record_set_id, self.topic, "mapping", result["mapping_file"]
        )
        if result_mapping["status"] == "SUCCESS":
            status["mapping_file"] = True
Jonas Waeber's avatar
Jonas Waeber committed
79
        else:
80
81
82
83
84
85
86
87
88
            status["mapping_file"] = False
            status["status"] = "FATAL"
            status["messages"].append(result_mapping["message"])
        if result["xslt_file"] != "":
            result_xslt = self._send_config_to_kafka(
                record_set_id, self.topic, "transform", result["xslt_file"]
            )
            if result_xslt["status"] == "SUCCESS":
                status["xslt_file"] = True
Jonas Waeber's avatar
Jonas Waeber committed
89
            else:
90
91
92
                status["xslt_file"] = False
                status["status"] = "WARNING"
                status["messages"].append(result_xslt["message"])
Jonas Waeber's avatar
Jonas Waeber committed
93
        else:
94
95
96
97
98
99
100
101
            status["xslt_file"] = False
            status["status"] = "WARNING"
        if result["local_transform"] != "":
            result_local_transform = self._send_config_to_kafka(
                record_set_id, self.topic, "localTransform", result["local_transform"]
            )
            if result_local_transform["status"] == "SUCCESS":
                status["local_transform_file"] = True
Jonas Waeber's avatar
Jonas Waeber committed
102
            else:
103
104
105
                status["local_transform_file"] = False
                status["status"] = "WARNING"
                status["messages"].append(result_local_transform["message"])
Jonas Waeber's avatar
Jonas Waeber committed
106
        else:
107
108
109
110
            status["local_transform_file"] = False
            status["status"] = "WARNING"
        if "status" not in status:
            status["status"] = "SUCCESS"
Jonas Waeber's avatar
Jonas Waeber committed
111
112
        return status

113
114
115
    def _retrieve_mapping_files(
        self, record_set_id: str
    ) -> Dict[str, Union[str, bool, list]]:
Jonas Waeber's avatar
Jonas Waeber committed
116
        try:
117
118
119
120
121
122
            self.ssh_client.connect(
                hostname=self.host,
                port=self.port,
                username=self.user,
                password=self.password,
            )
Jonas Waeber's avatar
Jonas Waeber committed
123
124
        except BadHostKeyException as ex:
            message = f"Could not connect to the server because of a bad host key: {ex}"
Jonas Waeber's avatar
Jonas Waeber committed
125
            self.logger.error(message)
Jonas Waeber's avatar
Jonas Waeber committed
126
            return {
127
128
129
                "status": "FATAL",
                "type": "BadHostKeyException",
                "messages": [message],
Jonas Waeber's avatar
Jonas Waeber committed
130
131
132
            }
        except AuthenticationException:
            message = "Could not authenticate with the sftp server with the given credentials."
Jonas Waeber's avatar
Jonas Waeber committed
133
            self.logger.error(message)
Jonas Waeber's avatar
Jonas Waeber committed
134
            return {
135
136
137
                "status": "FATAL",
                "type": "AuthenticationException",
                "messages": [message],
Jonas Waeber's avatar
Jonas Waeber committed
138
139
140
            }
        except SSHException as ex:
            message = f"SSH Exception: {ex}."
Jonas Waeber's avatar
Jonas Waeber committed
141
            self.logger.error(message)
142
            return {"status": "FATAL", "type": "SSHException", "messages": [message]}
Jonas Waeber's avatar
Jonas Waeber committed
143
144
145
        except socket.error as ex:
            message = f"Socket Error: {ex}."
            self.logger.error(message)
146
            return {"status": "FATAL", "type": "SocketError", "messages": [message]}
Jonas Waeber's avatar
Jonas Waeber committed
147
148
149
150
151
152
153

        sftp = self.ssh_client.open_sftp()
        path = join(self.base_path, record_set_id, self.config_folder)

        # Check mapping file
        mapping_file_path = join(path, self.mapping_file)
        try:
154
            with sftp.open(mapping_file_path, "r") as mapping:
Jonas Waeber's avatar
Jonas Waeber committed
155
156
                file_contents_mapping = mapping.read()
        except IOError as err:
157
            message = f"Could not open the mapping file at {mapping_file_path} (ERROR: {err})."
Jonas Waeber's avatar
Jonas Waeber committed
158
159
160
            sftp.close()
            self.ssh_client.close()
            self.logger.error(message)
161
            return {"status": "FATAL", "type": "NoMappingFile", "message": message}
Jonas Waeber's avatar
Jonas Waeber committed
162
163
164
165

        # Check xslt file
        xslt_file_path = join(path, self.xslt_file)
        try:
166
            with sftp.open(xslt_file_path, "r") as xslt:
Jonas Waeber's avatar
Jonas Waeber committed
167
168
                file_contents_xslt = xslt.read()
        except IOError:
169
            file_contents_xslt = ""
Jonas Waeber's avatar
Jonas Waeber committed
170
171
172
173

        # Check local transform file
        local_transform_file_path = join(path, self.local_transform_file)
        try:
174
            with sftp.open(local_transform_file_path, "r") as local_transform:
Jonas Waeber's avatar
Jonas Waeber committed
175
176
                file_contents_local_transform = local_transform.read()
        except IOError:
177
            file_contents_local_transform = ""
Jonas Waeber's avatar
Jonas Waeber committed
178
179
180

        sftp.close()
        self.ssh_client.close()
Jonas Waeber's avatar
Jonas Waeber committed
181
        return {
182
183
184
185
            "status": "SUCCESS",
            "mapping_file": file_contents_mapping,
            "xslt_file": file_contents_xslt,
            "local_transform": file_contents_local_transform,
Jonas Waeber's avatar
Jonas Waeber committed
186
187
        }

188
189
190
191
    def _send_config_to_kafka(
        self, record_set_id, topic: str, config_type: str, content: str
    ) -> Dict[str, Union[str, list]]:
        key = f"{record_set_id}#{config_type}"
Jonas Waeber's avatar
Jonas Waeber committed
192
193
194
195
        try:
            # TODO: This could be improved to actually check if the returned future succeeds.
            # However this was never a problem so far.
            self.producer.send(topic, key=key, value=content)
196
            return {"status": "SUCCESS"}
Jonas Waeber's avatar
Jonas Waeber committed
197
198
        except KafkaTimeoutError as err:
            return {
199
200
201
                "status": "FATAL",
                "type": "KafkaTimeOut",
                "message": f"Kafka Error: {err}.",
Jonas Waeber's avatar
Jonas Waeber committed
202
            }