FetchMappingFile.py 4.21 KB
Newer Older
Matthias's avatar
Matthias committed
1
2
3
4
5
6
7
8
9
from flask_restful import Resource, current_app
from kafka import KafkaProducer

from helpers.Error import ImportApiError
import paramiko


class FetchMappingFile(Resource):

Jonas Waeber's avatar
Jonas Waeber committed
10
    def get(self, recordset_id, session_id):
Matthias's avatar
Matthias committed
11
12
13
14
15
16
17
18
19
20
21
22
        """
        Fetches the mapping file form the sftp-server
        ---
        tags:
          - fetch mapping file
        parameters:
          - in: path
            name: recordset_id
            required: true
            description: The name of the record set (matches folder name on sftp)
            example: AfZ-Becker-Audiovisuals
            type: string
Jonas Waeber's avatar
Jonas Waeber committed
23
          - in: path
Jonas Waeber's avatar
Jonas Waeber committed
24
            name: session_id
Jonas Waeber's avatar
Jonas Waeber committed
25
26
27
28
            required: true
            description: A session id used to distinguish between different runs.
            example: uuid
            type: string
Matthias's avatar
Matthias committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
        responses:
          200:
            description: Success, the mapping file has been retrieved
            schema:
              properties:
                status:
                  type: string
                  example: SUCCESS/FAILURE
                  enum: ['SUCCESS', 'FAILURE']
                contents:
                  type: string/yml
                  example: the contents of the mapping file...

        """

44
45
46
        return self.fetchMappingFile(recordset_id, session_id)

    def fetchMappingFile(self, recordset_id, session_id):
Matthias's avatar
Matthias committed
47
48
49
50
51
        # 1. read file contents from sftp:
        host = current_app.config["sftp_host"]
        port = current_app.config["sftp_port"]
        user = current_app.config["sftp_user"]
        pwd = current_app.config["sftp_password"]
52
        fileContentsMapping = ''
Matthias's avatar
Matthias committed
53
        fileContentsTransform = ''
54
        fileContentsLocalTransform = ''
Matthias's avatar
Matthias committed
55
56
57
58
59
60
61
62
63

        try:
            ssh_client = paramiko.SSHClient()
            ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh_client.connect(hostname=host,
                               port=port,
                               username=user,
                               password=pwd)
            sftp_client = ssh_client.open_sftp()
64

Jonas Waeber's avatar
Jonas Waeber committed
65
            for fileName in ['mapping.yml', 'transform.xslt', 'localTransforms.yml']:
66
67
68
69
70
71
72
73
                configDirectory = 'config'
                if current_app.config['env'] == 'stage':
                    configDirectory = 'config-stage'
                elif current_app.config['env'] == 'test':
                    configDirectory = 'config-test'

                remote_path = '/swissbib_index/mb_sftp/' + recordset_id +\
                              '/' + configDirectory + '/' + fileName
74
75
76
77
78
79
                try:
                    sftp_client.stat(remote_path)
                    remote_file = sftp_client.file(remote_path, 'r')
                    if fileName == 'mapping.yml':
                        fileContentsMapping = remote_file.read()
                    if fileName == 'transform.xslt':
Matthias's avatar
Matthias committed
80
                        fileContentsTransform = remote_file.read()
Jonas Waeber's avatar
Jonas Waeber committed
81
                    if fileName == 'localTransforms.yml':
82
83
84
85
                        fileContentsLocalTransform = remote_file.read()
                except IOError:
                    message = 'remote path "' + remote_path + '" does not exist'
                    current_app.logger.debug(message)
Matthias's avatar
Matthias committed
86
87
88
89
90
91
92
93
            sftp_client.close()
            ssh_client.close()
        except Exception as ex:
            message = str(ex)
            current_app.logger.debug(message)
            raise ImportApiError(message)

        # 2. write file content into kafka topic
Matthias's avatar
Matthias committed
94
        topic = current_app.config['topic-configs']
Matthias's avatar
Matthias committed
95
96
97
        try:
            producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
                                     key_serializer=str.encode)
98
99
            key = recordset_id + '#'
            producer.send(topic, key=key + 'mapping', value=fileContentsMapping)
Matthias's avatar
Matthias committed
100
101
102
103
            if "" != fileContentsTransform:
                producer.send(topic, key=key + 'transform', value=fileContentsTransform)
            if "" != fileContentsLocalTransform:
                producer.send(topic, key=key + 'localTransform', value=fileContentsLocalTransform)
Matthias's avatar
Matthias committed
104
105
106
107
108
        except Exception as ex:
            message = str(ex)
            current_app.logger.debug(message)
            raise ImportApiError(message)
        return {'status': 'SUCCESS'}, 200