MediametadataToDB.py 15 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#  mediametadatatodb
#  Copyright (C) 2020  Memoriav
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the GNU Affero General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

Matthias's avatar
Matthias committed
17
import json
18
import logging
19
import numbers
Matthias's avatar
Matthias committed
20
import os
21
import time
22
23

import mysql.connector as mariadb
24
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
25
from kafka import KafkaConsumer
26
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
27
from kafka.errors import KafkaError
Matthias's avatar
Matthias committed
28
from kubernetes import config
29
30
from kubernetes.config.config_exception import ConfigException as K8sConfigException

31
32
from mediametadatatodb_app.resources.reporter import Reporter

33
34
35
36

def _connect_to_kafka(retries=0):
    try:
        consumer = KafkaConsumer(
37
            os.environ['INPUT_TOPIC'],
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
            value_deserializer=lambda m: json.loads(m.decode('utf8')),
            bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=os.environ['KAFKA_GROUP_ID'],
            consumer_timeout_ms=30000)
        return consumer
    except KafkaError as ex:
        status = 'KafkaError: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)


def _connect_to_mariadb(retries=0):
    try:
63
64
65
66
67
        mariadb_connection = mariadb.connect(user=os.environ['MARIADB_USER'],
                                             password=os.environ['MARIADB_PASSWORD'],
                                             host=os.environ['MARIADB_HOST'],
                                             port=int(os.environ['MARIADB_PORT']),
                                             database=os.environ['MARIADB_DATABASE'])
68
69
70
71
72
73
74
75
        mariadb_connection.autocommit = False
        mariadb_cursor = mariadb_connection.cursor()
        return mariadb_connection, mariadb_cursor
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['MARIADB_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
76
            _connect_to_mariadb(retries + 1)
77
        exit(1)
Matthias's avatar
Matthias committed
78
79


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
80
81
def _try_fetch_from_json_object(record_json_data,
                                fetch_from_obj_fun,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
82
                                access_status) -> dict:
83
84
    if 'locator' in record_json_data and '@id' in record_json_data:
        return fetch_from_obj_fun(record_json_data, access_status)
85
86
87
88
89
    elif '@id' in record_json_data:
        logging.info('Record ' + record_json_data['@id'] +
                     ' does not have a locator-property.')
    else:
        logging.warning('Record without @id-property detected!')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
90
    return dict()
91
92


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
93
def _get_values_from_thumbnail_object(msg, _access_status) -> dict:
94
95
96
    return_values = {'mimetype': 'image/jp2', 'type': 'image', 'access': 'public', 'proto': 'file',
                     'sig': '{}-poster'.format(msg['@id'].split('/')[-2]),
                     'uri': 'file:///data/{}-poster.jp2'.format(msg['@id'].split('/')[-2])}
97
98
99
100
101
102
103
104
105
    if 'height' in msg:
        height = msg['height']
        return_values['height'] = height
    if 'width' in msg:
        width = msg['width']
        return_values['width'] = width
    return return_values


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
106
def _get_values_from_digital_object(msg, access_status) -> dict:
107
108
    if 'isDistributedOn' not in msg:
        logging.warning("No isDistributedOn property found in object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
109
        return dict()
110
    file_extension = ''
111
    return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]}
112
113
    if 'height' in msg:
        height = msg['height']
114
        return_values['height'] = height
115
116
    if 'width' in msg:
        width = msg['width']
117
        return_values['width'] = width
118
119
    if 'duration' in msg:
        duration = msg['duration']
120
        return_values['duration'] = duration
121
    return_values['type'] = msg['isDistributedOn']
122
123
124
125
126
127
    if 'hasMimeType' in msg:
        if return_values['type'] == 'image':
            mimetype = 'image/jp2'
        else:
            mimetype = msg['hasMimeType']
        return_values['mimetype'] = mimetype
128
129
130
131
    if _is_remote_file(msg):
        return_values['uri'] = msg['locator']
        if access_status == 'public':
            return_values['proto'] = 'redirect'
132
        else:
133
134
135
136
137
138
139
140
141
142
            return_values['proto'] = 'proxy'
    else:
        return_values['proto'] = 'file'
        if return_values['type'] == 'image':
            file_extension = 'jp2'
        if return_values['type'] == 'audio':
            file_extension = 'mp4'
        if return_values['type'] == 'video':
            file_extension = 'mp4'
        return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
143
    return return_values
144
145


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
146
def _is_remote_file(msg) -> bool:
147
148
    return 'locator' in msg and not \
        msg['locator'].startswith('https://memobase.ch/')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
149
150


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
151
def _is_directly_fetchable(digital_object_resource) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
152
153
154
155
156
    return digital_object_resource['isDistributedOn'] == 'audio' or \
           digital_object_resource['isDistributedOn'] == 'image' or \
           digital_object_resource['isDistributedOn'] == 'video'


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
157
def _get_access_status(graph, record_id) -> str:
158
159
160
    for resource in graph:
        if 'type' in resource and resource['type'] == 'access' and \
                'regulates' in resource and \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
161
                resource['regulates'].startswith('https://memobase.ch/digital/') and \
162
                'name' in resource:
163
            if resource['name'] == 'public':
164
                logging.debug(f'{record_id}: Setting access for digital object to `public`')
165
166
                return 'public'
            elif resource['name'] == 'private':
167
                logging.debug(f'{record_id}: Setting access for digital object to `closed`')
168
                return 'closed'
169
170
171
            else:
                logging.info(f'Digital object of record {record_id} has access type ' +
                             f'`{resource["name"]}`. This makes the media resource unavailable.')
172
173
174
                return 'unavailable'
    logging.info(f'Digital object of record {record_id} has no related access information!'
                 + ' Media resource is therefore unavailable')
175
176
177
    return 'unavailable'


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
178
def _get_record_id(graph) -> str:
179
    for resource in graph:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
180
181
        if '@type' in resource and resource['@type'] == \
                'https://www.ica.org/standards/RiC/ontology#Record':
182
183
184
            return resource['@id'] if '@id' in resource else None


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
185
def _create_sql_stmt(table_name, record, fields) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
186
187
188
189
190
191
    db_fields = [dbField for dbField in fields
                 if dbField in record and record[dbField] is not None]
    db_values = ','.join([str(record[db_field])
                          if isinstance(record[db_field], numbers.Number)
                          else "'{}'".format(record[db_field])
                          for db_field in db_fields])
192
    db_fields = ','.join(db_fields)
193
    # noinspection SqlNoDataSourceInspection
194
195
    return 'INSERT IGNORE INTO {} ({}) VALUES ({})'.format(
        table_name, db_fields, db_values)
196
197


198
def _create_entities_entry(record, mariadb_cursor):
199
    fields = ['sig', 'uri', 'access', 'proto']
200
201
    sql_stmt = _create_sql_stmt('entities', record, fields)
    mariadb_cursor.execute(sql_stmt)
202
203


204
def _create_metadata_entry(record, mariadb_cursor):
205
    fields = ['sig', 'mimetype', 'height', 'width', 'duration', 'type']
206
207
    sql_stmt = _create_sql_stmt('metadata', record, fields)
    mariadb_cursor.execute(sql_stmt)
208
209


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
210
def _has_audio_snippet(record) -> bool:
211
    return record['type'] == 'audio' and \
212
213
           'uri' in record and \
           record['uri'].startswith('file://')
214
215


216
# TODO: Eventually remove
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
217
218
def _has_http_locator(digital_object) -> bool:
    return 'locator' in digital_object and digital_object['locator'].startswith('http')
219
220


221
def _create_audio_snippet_entry(record, mariadb_cursor):
222
223
    snippet_record = record.copy()
    snippet_record['sig'] = snippet_record['sig'] + '-intro'
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
224
    # //@formatter:off
225
    snippet_record['duration'] = \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
226
        30.0 if float(snippet_record['duration']) >= 30.0 \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
227
228
        else float(snippet_record['duration'])
    # //@formatter:on
229
    snippet_record['mimetype'] = 'audio/mpeg'
230
231
    snippet_record['uri'] = \
        '.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
232
233
    _create_entities_entry(snippet_record, mariadb_cursor)
    _create_metadata_entry(snippet_record, mariadb_cursor)
234
235


236
def _write_values_in_db(mariadb_cursor, record_values_for_db):
237
    try:
238
239
240
        for record in record_values_for_db:
            _create_entities_entry(record, mariadb_cursor)
            _create_metadata_entry(record, mariadb_cursor)
241
            if _has_audio_snippet(record):
242
                _create_audio_snippet_entry(record, mariadb_cursor)
243
244
245
246
247
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)


248
class MediametadataToDB:
249
    def run(self):
Matthias's avatar
Matthias committed
250
251
        """
        Import media metadata to mariaDB
252
        This service should not return anything but run forever.
Matthias's avatar
Matthias committed
253
254
255
256
257
258
259
260
261
262
        ---
        tags:
          - mediametadatatodb
        responses:
          500:
            description: some error occured
            schema:
              properties:
                error:
                  type: string
263
                  example: there was an exception
Matthias's avatar
Matthias committed
264
265
        """

266
267
        consumer = _connect_to_kafka()
        mariadb_connection, mariadb_cursor = _connect_to_mariadb()
268
269

        # process messages:
270
        record_values_for_db = []
271
        reporter = Reporter()
272
        counter = 0
273
        try:
274
            while True:
275
                consumer.poll()
276
                for record_number, record_object in enumerate(consumer):
277
                    counter += 1
278
279
                    record = record_object.value['@graph']
                    record_id = _get_record_id(record)
280
                    logging.debug(f'Processing record {record_id}')
281
                    access_status = _get_access_status(record, record_id)
282
                    if access_status == 'public' or access_status == 'closed':
283
                        enrichable = False
284
285
286
287
                        for record_resource in record:
                            if 'type' in record_resource and \
                                    record_resource['type'] == 'digitalObject' and \
                                    _has_http_locator(record_resource):
288
289
                                enrichable = True
                                enriched_data = \
290
                                    _try_fetch_from_json_object(record_resource,
291
292
293
294
295
296
297
298
                                                                _get_values_from_digital_object,
                                                                access_status)
                                if not enriched_data:
                                    reporter.send_message(record_id,
                                                          "FATAL",
                                                          "Could not process digitalObject")
                                else:
                                    record_values_for_db.append(enriched_data)
299
300
                            if 'type' in record_resource and \
                                    record_resource['type'] == 'thumbnail':
301
302
                                enrichable = True
                                enriched_data = \
303
                                    _try_fetch_from_json_object(record_resource,
304
305
                                                                _get_values_from_thumbnail_object,
                                                                access_status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
306
                                record_values_for_db.append(enriched_data)
307
308
309
310
                        if not enrichable:
                            reporter.send_message(record_id,
                                                  "IGNORE",
                                                  "Resource has no digitalObject or thumbnail")
311
                    else:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
312
313
                        logging.info(f'Ignoring record {record_id} since' +
                                     f' access of digitalObject is {access_status}')
314
                        reporter.send_message(record_id,
315
                                              "IGNORE",
316
317
                                              "Ignoring record since access of " +
                                              "digitalObject is unavailable")
318
                    if record_number % 100 == 0:
319
320
321
322
                        _write_values_in_db(mariadb_cursor, record_values_for_db)
                        mariadb_connection.commit()
                        for record_value in record_values_for_db:
                            logging.info(f'Record {record_value["sig"]} successfully indexed')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
323
324
                            reporter.send_message(record_value['sig'], "SUCCESS",
                                                  "Indexing successful")
325
326
                        record_values_for_db = []
                        consumer.commit()
327
328
                    if counter % 1000 == 0:
                        logging.info('{} messages read till now'.format(counter))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
329
330
331
                # arriving here means there are no new messages to poll from
                _write_values_in_db(mariadb_cursor, record_values_for_db)
                mariadb_connection.commit()
332
                for record_value in record_values_for_db:
333
                    reporter.send_message(record_value['sig'], "SUCCESS", "Indexing successful")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
334
335
                record_values_for_db = []
                consumer.commit()
336
337
        except Exception as ex:
            status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
Matthias's avatar
Matthias committed
338
            logging.error(status)
339
            for record_value in record_values_for_db:
340
                reporter.send_message(record_value['sig'], "FATAL", f"Indexing failed: {ex}")
341

Matthias's avatar
Matthias committed
342
343
344
345
346
347
    def __init__(self):
        # TODO : maybe take that to a configuration (development vs pod running in
        # k8s cluster)
        try:
            # to be used when inside a kubernetes cluster
            config.load_incluster_config()
348
            logging.info("Loading incluster config")
349
        except K8sConfigException:
Matthias's avatar
Matthias committed
350
351
352
353
            try:
                # use .kube directory
                # for local development
                config.load_kube_config()
354
                logging.info("Loading kube config (for local development)")
355
            except K8sConfigException:
Matthias's avatar
Matthias committed
356
                logging.error("No kubernetes cluster defined")