FetchMappingFile.py 3.62 KB
Newer Older
Matthias's avatar
Matthias committed
1
2
3
4
5
6
7
8
9
from flask_restful import Resource, current_app
from kafka import KafkaProducer

from helpers.Error import ImportApiError
import paramiko


class FetchMappingFile(Resource):

10
    def get(self, recordset_id):
Matthias's avatar
Matthias committed
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
        """
        Fetches the mapping file form the sftp-server
        ---
        tags:
          - fetch mapping file
        parameters:
          - in: path
            name: recordset_id
            required: true
            description: The name of the record set (matches folder name on sftp)
            example: AfZ-Becker-Audiovisuals
            type: string
        responses:
          200:
            description: Success, the mapping file has been retrieved
            schema:
              properties:
                status:
                  type: string
                  example: SUCCESS/FAILURE
                  enum: ['SUCCESS', 'FAILURE']
                contents:
                  type: string/yml
                  example: the contents of the mapping file...

        """

        # 1. read file contents from sftp:
        host = current_app.config["sftp_host"]
        port = current_app.config["sftp_port"]
        user = current_app.config["sftp_user"]
        pwd = current_app.config["sftp_password"]
        # host = 'mb-wf2.memobase.unibas.ch'
        # port = 80
        # user = 'mb_sftp'
        # pwd = '***'
47
48
49
        fileContentsMapping = ''
        fileContentsTranform = ''
        fileContentsLocalTransform = ''
Matthias's avatar
Matthias committed
50
51
52
53
54
55
56
57
58

        try:
            ssh_client = paramiko.SSHClient()
            ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh_client.connect(hostname=host,
                               port=port,
                               username=user,
                               password=pwd)
            sftp_client = ssh_client.open_sftp()
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

            for fileName in ['mapping.yml', 'transform.xslt', 'localTransform.yml']:
                remote_path = '/swissbib_index/mb_sftp/' + recordset_id + '/config/' + fileName
                try:
                    sftp_client.stat(remote_path)
                    remote_file = sftp_client.file(remote_path, 'r')
                    if fileName == 'mapping.yml':
                        fileContentsMapping = remote_file.read()
                    if fileName == 'transform.xslt':
                        fileContentsTranform = remote_file.read()
                    if fileName == 'localTransform.yml':
                        fileContentsLocalTransform = remote_file.read()
                except IOError:
                    message = 'remote path "' + remote_path + '" does not exist'
                    current_app.logger.debug(message)
                    raise ImportApiError(message)
Matthias's avatar
Matthias committed
75
76
77
78
79
80
81
82
            sftp_client.close()
            ssh_client.close()
        except Exception as ex:
            message = str(ex)
            current_app.logger.debug(message)
            raise ImportApiError(message)

        # 2. write file content into kafka topic
83
        topic = 'import-process-config'
Matthias's avatar
Matthias committed
84
85
86
        try:
            producer = KafkaProducer(bootstrap_servers=current_app.config['kafka-broker-url'],
                                     key_serializer=str.encode)
87
88
89
90
            key = recordset_id + '#'
            producer.send(topic, key=key + 'mapping', value=fileContentsMapping)
            producer.send(topic, key=key + 'transform', value=fileContentsTranform)
            producer.send(topic, key=key + 'localTransform', value=fileContentsLocalTransform)
Matthias's avatar
Matthias committed
91
92
93
94
95
96
        except Exception as ex:
            message = str(ex)
            current_app.logger.debug(message)
            raise ImportApiError(message)

        return {'status': 'SUCCESS'}, 200