Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

MediametadataToDB.py 16 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#  mediametadatatodb
#  Copyright (C) 2020  Memoriav
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the GNU Affero General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

Matthias's avatar
Matthias committed
17
import json
18
import logging
Matthias's avatar
Matthias committed
19
import os
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
20
import re
21
import time
22

23
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
24
from kafka import KafkaConsumer
25
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
26
from kafka.errors import KafkaError
Matthias's avatar
Matthias committed
27
from kubernetes import config
28
29
from kubernetes.config.config_exception import ConfigException as K8sConfigException

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30
from mediametadatatodb_app.resources.processor import RecordProcessor
31

32
33

def _connect_to_kafka(retries=0):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34
35
36
    """
    Connect to Kafka. Abort after configured retries.
    """
37
38
    try:
        consumer = KafkaConsumer(
39
            os.environ['INPUT_TOPIC'],
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
            value_deserializer=lambda m: json.loads(m.decode('utf8')),
            bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=os.environ['KAFKA_GROUP_ID'],
            consumer_timeout_ms=30000)
        return consumer
    except KafkaError as ex:
        status = 'KafkaError: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63
64
def _extract_fields(record_json_data,
                    fetch_from_obj_fun,
65
                    access_status) -> (dict, str):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
66
67
68
    """
    Extract fields from JSON object by applying `fetch_from_obj_fun` function
    """
69
    if 'locator' in record_json_data and '@id' in record_json_data:
70
        return fetch_from_obj_fun(record_json_data, access_status), None
71
72
    elif '@id' in record_json_data:
        logging.info('Record ' + record_json_data['@id'] +
73
74
                     ' does not have a locator property.')
        return dict(), 'No locator property found'
75
76
    else:
        logging.warning('Record without @id-property detected!')
77
        return dict(), 'No @id property found'
78
79


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
80
81
82
83
def _extract_thumbnail_values(msg, _access_status) -> dict:
    """
    Extract information on thumbnail from JSON object
    """
84
85
    return_values = {'type': 'image', 'access': 'public', 'proto': 'file',
                     'sig': '{}-poster'.format(msg['@id'].split('/')[-2])}
86
    if 'height' in msg:
87
        height = _normalize_dimension(msg['height'])
88
89
        return_values['height'] = height
    if 'width' in msg:
90
        width = _normalize_dimension(msg['width'])
91
        return_values['width'] = width
92
93
94
95
96
97
98
99
100
101
102
103
104
105
    if 'hasMimeType' in msg:
        return_values['mimetype'] = msg['hasMimeType']
        if return_values['mimetype'] == 'image/jpeg':
            file_extension = 'jpg'
        elif return_values['mimetype'] == 'image/png':
            file_extension = 'png'
        elif return_values['mimetype'] == 'image/jp2':
            file_extension = 'jp2'
        else:
            file_extension = ''
            logging.warning('No valid mimetype found!')
    else:
        file_extension = ''
        logging.warning('No valid mimetype found!')
106
    return_values['uri'] = 'file:///data/{}-poster.{}'. \
107
        format(msg['@id'].split('/')[-2], file_extension)
108
109
110
    return return_values


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
111
def _extract_dig_obj_vals(msg, access_status) -> dict:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
113
114
    """
    Extract information on digital object from JSON object
    """
115
116
    if 'isDistributedOn' not in msg:
        logging.warning("No isDistributedOn property found in object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
117
        return dict()
118
    file_extension = ''
119
    return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]}
120
    if 'height' in msg:
121
        height = _normalize_dimension(msg['height'])
122
        return_values['height'] = height
123
    if 'width' in msg:
124
        width = _normalize_dimension(msg['width'])
125
        return_values['width'] = width
126
    if 'duration' in msg:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
127
        duration = _normalize_duration(msg['duration'])
128
        return_values['duration'] = duration
129
    return_values['type'] = msg['isDistributedOn']
130
    if 'hasMimeType' in msg:
131
        mimetype = msg['hasMimeType']
132
        return_values['mimetype'] = mimetype
133
134
    if _is_remote_file(msg):
        return_values['uri'] = msg['locator']
135
136
137
        if access_status == 'public' and not return_values['type'] == 'image':
            # Remote images are always accessed via proxy because their respective
            # route goes over the internal image server
138
            return_values['proto'] = 'redirect'
139
        else:
140
            return_values['proto'] = 'proxydirect'
141
142
143
    else:
        return_values['proto'] = 'file'
        if return_values['type'] == 'image':
144
145
146
147
148
149
150
151
152
153
154
155
156
            if return_values.get('mimetype'):
                if return_values['mimetype'] == 'image/jpeg':
                    file_extension = 'jpg'
                elif return_values['mimetype'] == 'image/png':
                    file_extension = 'png'
                elif return_values['mimetype'] == 'image/jp2':
                    file_extension = 'jp2'
                else:
                    file_extension = ''
                    logging.warning('No valid mimetype found!')
            else:
                file_extension = ''
                logging.warning('No valid mimetype found!')
157
158
159
160
161
        if return_values['type'] == 'audio':
            file_extension = 'mp4'
        if return_values['type'] == 'video':
            file_extension = 'mp4'
        return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
162
    return return_values
163
164


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def _create_audio_snippet_entry(record) -> dict:
    """
    Create an audio snippet entry based on the digital object
    """
    snippet_record = record.copy()
    if 'duration' not in snippet_record:
        logging.warning("No duration for audio found: Setting duration to 0")
        snippet_record['duration'] = 0
    snippet_record['sig'] = snippet_record['sig'] + '-intro'
    snippet_record['access'] = 'public'
    # //@formatter:off
    snippet_record['duration'] = \
        30 if _normalize_duration(snippet_record['duration']) >= 30 \
        else _normalize_duration(snippet_record['duration'])
    # //@formatter:on
    snippet_record['mimetype'] = 'audio/mpeg'
    snippet_record['uri'] = \
        '.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
    return snippet_record


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
186
def _is_remote_file(msg) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
187
188
189
    """
    Media file is saved on a remote system
    """
190
    return 'locator' in msg and not \
191
        msg['locator'].startswith('sftp:/')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
192
193


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194
def _is_directly_fetchable(digital_object_resource) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
195
196
197
198
    """
    Media file is directly accessible (i.e. is delivered via Memobase player)
    """
    # //@formatter:off
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
199
    return digital_object_resource['isDistributedOn'] == 'audio' or \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
200
201
202
        digital_object_resource['isDistributedOn'] == 'image' or \
        digital_object_resource['isDistributedOn'] == 'video'
    # //@formatter:on
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
203
204


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
205
def _get_access_status(graph, record_id) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
206
    """
207
208
    Decide on access status. Possible values are `public`, `closed`, `unavailable`,
    `faro`, `onsite`, or `noonsite`
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
209
    """
210
211
212
    # Since several access flags in one document are possible, we have to apply a ranking
    # which is `closed`, `public`, `faro`, `onsite`, `noonsite` and as fallback `unavailable`
    access_flags = list()
213
214
215
    for resource in graph:
        if 'type' in resource and resource['type'] == 'access' and \
                'regulates' in resource and \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
216
                resource['regulates'].startswith('https://memobase.ch/digital/') and \
217
                'name' in resource:
218
            if resource['name'] == 'public':
219
                access_flags.append('public')
220
            elif resource['name'] == 'private':
221
                logging.debug(f'{record_id}: Setting access for digital object to `closed`')
222
                access_flags.append('closed')
223
224
225
            else:
                logging.info(f'Digital object of record {record_id} has access type ' +
                             f'`{resource["name"]}`. This makes the media resource unavailable.')
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
                access_flags.append(resource['name'])
    if 'closed' in access_flags:
        logging.debug(f'{record_id}: Setting access for digital object to `closed`')
        return 'closed'
    elif 'public' in access_flags:
        logging.debug(f'{record_id}: Setting access for digital object to `public`')
        return 'public'
    elif 'faro' in access_flags:
        logging.info(f'Digital object of record {record_id} has access type `faro`.' +
                     ' This makes the media resource unavailable.')
        return 'faro'
    elif 'onsite' in access_flags:
        logging.info(f'Digital object of record {record_id} has access type `onsite`.' +
                     ' This makes the media resource unavailable.')
        return 'onsite'
    elif 'noonsite' in access_flags:
        logging.info(f'Digital object of record {record_id} has access type `noonsite`.' +
                     ' This makes the media resource unavailable.')
        return 'noonsite'
    else:
        logging.info(f'Digital object of record {record_id} has no or invalid access information!'
                     + ' The media resource is therefore unavailable')
        return 'unavailable'
249
250


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
251
def _get_record_id(graph) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
252
253
254
    """
    Get record identifier
    """
255
    for resource in graph:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
256
257
        if '@type' in resource and resource['@type'] == \
                'https://www.ica.org/standards/RiC/ontology#Record':
258
259
260
            return resource['@id'] if '@id' in resource else None


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
261
def _has_audio_snippet(record) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
262
263
264
265
    """
    Record has an attached audio snippet (created by external service)
    """
    return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://')
266
267


268
def _normalize_dimension(dimension) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
269
270
271
    """
    Cast dimension to int
    """
272
    return round(float(dimension))
273
274


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
275
def _normalize_duration(duration) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
276
277
278
    """
    Normalise different representation of duration
    """
279
    if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
280
281
        split = duration.split(':')
        return int(split[0]) * 60 + int(split[1])
282
    elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
283
284
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
285
    elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
286
287
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
288
    elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
289
        return int(duration.split('.')[0])
290
291
    elif re.fullmatch(r'\d+', str(duration), re.ASCII):
        return int(duration)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
292
293
294
295
296
    else:
        logging.warning(f'Can\'t parse duration `{duration}`')
        return 0


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
297
298
299
300
301
def _is_digital_object(resource) -> bool:
    """
    Resource is of type `digital object`
    """
    return 'type' in resource and resource['type'] == 'digitalObject'
302
303


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
304
305
306
307
308
def _is_thumbnail(resource) -> bool:
    """
    Resource is of type `thumbnail`
    """
    return 'type' in resource and resource['type'] == 'thumbnail'
309

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
310
311
312
313
314
315

def _is_playable(access_status) -> bool:
    """
    Digital object can be retrieved via link
    """
    return access_status == 'public' or access_status == 'closed'
316
317


318
class MediametadataToDB:
319
    def run(self):
Matthias's avatar
Matthias committed
320
321
        """
        Import media metadata to mariaDB
322
        This service should not return anything but run forever.
Matthias's avatar
Matthias committed
323
324
        """

325
        consumer = _connect_to_kafka()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
326
        record_processor = RecordProcessor()
327
        counter = 0
328
        try:
329
            while True:
330
                consumer.poll()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
331
                for record_object in consumer:
332
                    counter += 1
333
                    record = record_object.value['@graph']
334
                    headers = record_object.headers
335
                    record_id = _get_record_id(record)
336
                    logging.debug(f'Processing record {record_id}')
337
                    record_processor.new_record(record_id, headers)
338
                    access_status = _get_access_status(record, record_id)
339
                    for record_resource in record:
340
341
                        if _is_digital_object(record_resource):
                            if _is_playable(access_status):
342
                                enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
343
344
                                                                       _extract_dig_obj_vals,
                                                                       access_status)
345
346
347
348
349
350
351
352
353
354
                                if enriched_data:
                                    record_processor.digital_object_ok(record_id, enriched_data)
                                    if _has_audio_snippet(enriched_data):
                                        snippet_entry = _create_audio_snippet_entry(enriched_data)
                                        if snippet_entry:
                                            record_processor.audio_snippet_ok(record_id,
                                                                              snippet_entry)
                                        else:
                                            record_processor.audio_snippet_fail(record_id)
                                else:
355
                                    record_processor.digital_object_fail(record_id, error)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
356
                            else:
357
358
359
360
                                record_processor.digital_object_ignore(record_id,
                                                                       f"Ignored because of"
                                                                       f" access status "
                                                                       f"{access_status}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
361
                        elif _is_thumbnail(record_resource):
362
                            enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
363
364
                                                                   _extract_thumbnail_values,
                                                                   access_status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
365
366
                            if enriched_data:
                                record_processor.thumbnail_ok(record_id, enriched_data)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
367
                            else:
368
                                record_processor.thumbnail_fail(record_id, error)
369
370
                    record_processor.index()
                    consumer.commit()
371
372
                    if counter % 1000 == 0:
                        logging.info('{} messages read till now'.format(counter))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
373
                # arriving here means there are no new messages to poll from
374
375
                record_processor.index()
                consumer.commit()
376
377
        except Exception as ex:
            status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
Matthias's avatar
Matthias committed
378
            logging.error(status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
379
            record_processor.abort(ex)
380

Matthias's avatar
Matthias committed
381
382
383
384
385
386
    def __init__(self):
        # TODO : maybe take that to a configuration (development vs pod running in
        # k8s cluster)
        try:
            # to be used when inside a kubernetes cluster
            config.load_incluster_config()
387
            logging.info("Loading incluster config")
388
        except K8sConfigException:
Matthias's avatar
Matthias committed
389
390
391
392
            try:
                # use .kube directory
                # for local development
                config.load_kube_config()
393
                logging.info("Loading kube config (for local development)")
394
            except K8sConfigException:
Matthias's avatar
Matthias committed
395
                logging.error("No kubernetes cluster defined")