Due to a scheduled upgrade to version 14.10, GitLab will be unavailabe on Monday 30.05., from 19:00 until 20:00.

MediametadataToDB.py 16.1 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#  mediametadatatodb
#  Copyright (C) 2020  Memoriav
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the GNU Affero General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

Matthias's avatar
Matthias committed
17
import json
18
import logging
Matthias's avatar
Matthias committed
19
import os
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
20
import re
21
import time
22

23
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
24
from kafka import KafkaConsumer
25
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
26
from kafka.errors import KafkaError
Matthias's avatar
Matthias committed
27
from kubernetes import config
28
29
from kubernetes.config.config_exception import ConfigException as K8sConfigException

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30
from mediametadatatodb_app.resources.processor import RecordProcessor
31

32
33

def _connect_to_kafka(retries=0):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34
35
36
    """
    Connect to Kafka. Abort after configured retries.
    """
37
38
    try:
        consumer = KafkaConsumer(
39
            os.environ['INPUT_TOPIC'],
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
            value_deserializer=lambda m: json.loads(m.decode('utf8')),
            bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=os.environ['KAFKA_GROUP_ID'],
            consumer_timeout_ms=30000)
        return consumer
    except KafkaError as ex:
        status = 'KafkaError: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63
64
def _extract_fields(record_json_data,
                    fetch_from_obj_fun,
65
                    access_status) -> (dict, str):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
66
67
68
    """
    Extract fields from JSON object by applying `fetch_from_obj_fun` function
    """
69
    if 'locator' in record_json_data and '@id' in record_json_data:
70
        return fetch_from_obj_fun(record_json_data, access_status), None
71
72
    elif '@id' in record_json_data:
        logging.info('Record ' + record_json_data['@id'] +
73
74
                     ' does not have a locator property.')
        return dict(), 'No locator property found'
75
76
    else:
        logging.warning('Record without @id-property detected!')
77
        return dict(), 'No @id property found'
78
79


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
80
81
82
83
def _extract_thumbnail_values(msg, _access_status) -> dict:
    """
    Extract information on thumbnail from JSON object
    """
84
85
    return_values = {'type': 'image', 'access': 'public', 'proto': 'file',
                     'sig': '{}-poster'.format(msg['@id'].split('/')[-2])}
86
    if 'height' in msg:
87
        height = _normalize_dimension(msg['height'])
88
89
        return_values['height'] = height
    if 'width' in msg:
90
        width = _normalize_dimension(msg['width'])
91
        return_values['width'] = width
92
93
94
95
96
97
98
99
100
101
102
103
104
105
    if 'hasMimeType' in msg:
        return_values['mimetype'] = msg['hasMimeType']
        if return_values['mimetype'] == 'image/jpeg':
            file_extension = 'jpg'
        elif return_values['mimetype'] == 'image/png':
            file_extension = 'png'
        elif return_values['mimetype'] == 'image/jp2':
            file_extension = 'jp2'
        else:
            file_extension = ''
            logging.warning('No valid mimetype found!')
    else:
        file_extension = ''
        logging.warning('No valid mimetype found!')
106
    return_values['uri'] = 'file:///data/{}-poster.{}'. \
107
        format(msg['@id'].split('/')[-2], file_extension)
108
109
110
    return return_values


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
111
def _extract_dig_obj_vals(msg, access_status) -> dict:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
113
114
    """
    Extract information on digital object from JSON object
    """
115
116
    if 'isDistributedOn' not in msg:
        logging.warning("No isDistributedOn property found in object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
117
        return dict()
118
    file_extension = ''
119
    return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]}
120
    if 'height' in msg:
121
        height = _normalize_dimension(msg['height'])
122
        return_values['height'] = height
123
    if 'width' in msg:
124
        width = _normalize_dimension(msg['width'])
125
        return_values['width'] = width
126
    if 'duration' in msg:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
127
        duration = _normalize_duration(msg['duration'])
128
        return_values['duration'] = duration
129
    return_values['type'] = msg['isDistributedOn']
130
    if 'hasMimeType' in msg:
131
        mimetype = msg['hasMimeType']
132
        return_values['mimetype'] = mimetype
133
134
    if _is_remote_file(msg):
        return_values['uri'] = msg['locator']
135
136
137
        if access_status == 'public' and not return_values['type'] == 'image':
            # Remote images are always accessed via proxy because their respective
            # route goes over the internal image server
138
            return_values['proto'] = 'redirect'
139
        else:
140
            return_values['proto'] = 'proxydirect'
141
142
143
    else:
        return_values['proto'] = 'file'
        if return_values['type'] == 'image':
144
145
146
147
148
149
150
151
152
153
154
155
156
            if return_values.get('mimetype'):
                if return_values['mimetype'] == 'image/jpeg':
                    file_extension = 'jpg'
                elif return_values['mimetype'] == 'image/png':
                    file_extension = 'png'
                elif return_values['mimetype'] == 'image/jp2':
                    file_extension = 'jp2'
                else:
                    file_extension = ''
                    logging.warning('No valid mimetype found!')
            else:
                file_extension = ''
                logging.warning('No valid mimetype found!')
157
158
159
160
161
        if return_values['type'] == 'audio':
            file_extension = 'mp4'
        if return_values['type'] == 'video':
            file_extension = 'mp4'
        return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
162
    return return_values
163
164


165
def _create_audio_snippet_entry(record, access_status) -> dict:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
166
167
168
169
170
171
172
173
    """
    Create an audio snippet entry based on the digital object
    """
    snippet_record = record.copy()
    if 'duration' not in snippet_record:
        logging.warning("No duration for audio found: Setting duration to 0")
        snippet_record['duration'] = 0
    snippet_record['sig'] = snippet_record['sig'] + '-intro'
174
    snippet_record['access'] = access_status
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
175
176
177
178
179
180
181
182
183
184
185
    # //@formatter:off
    snippet_record['duration'] = \
        30 if _normalize_duration(snippet_record['duration']) >= 30 \
        else _normalize_duration(snippet_record['duration'])
    # //@formatter:on
    snippet_record['mimetype'] = 'audio/mpeg'
    snippet_record['uri'] = \
        '.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
    return snippet_record


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
186
def _is_remote_file(msg) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
187
188
189
    """
    Media file is saved on a remote system
    """
190
    return 'locator' in msg and not \
191
        msg['locator'].startswith('sftp:/')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
192
193


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194
def _is_directly_fetchable(digital_object_resource) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
195
196
197
198
    """
    Media file is directly accessible (i.e. is delivered via Memobase player)
    """
    # //@formatter:off
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
199
    return digital_object_resource['isDistributedOn'] == 'audio' or \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
200
201
202
        digital_object_resource['isDistributedOn'] == 'image' or \
        digital_object_resource['isDistributedOn'] == 'video'
    # //@formatter:on
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
203
204


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
205
def _get_access_status(graph, record_id) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
206
    """
207
208
    Decide on access status. Possible values are `public`, `closed`, `unavailable`,
    `faro`, `onsite`, or `noonsite`
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
209
    """
210
211
212
    # Since several access flags in one document are possible, we have to apply a ranking
    # which is `closed`, `public`, `faro`, `onsite`, `noonsite` and as fallback `unavailable`
    access_flags = list()
213
214
215
    for resource in graph:
        if 'type' in resource and resource['type'] == 'access' and \
                'regulates' in resource and \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
216
                resource['regulates'].startswith('https://memobase.ch/digital/') and \
217
                'name' in resource:
218
            if resource['name'] == 'public':
219
                access_flags.append('public')
220
            elif resource['name'] == 'private':
221
                logging.debug(f'{record_id}: Setting access for digital object to `closed`')
222
                access_flags.append('closed')
223
224
225
            else:
                logging.info(f'Digital object of record {record_id} has access type ' +
                             f'`{resource["name"]}`. This makes the media resource unavailable.')
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
                access_flags.append(resource['name'])
    if 'closed' in access_flags:
        logging.debug(f'{record_id}: Setting access for digital object to `closed`')
        return 'closed'
    elif 'public' in access_flags:
        logging.debug(f'{record_id}: Setting access for digital object to `public`')
        return 'public'
    elif 'faro' in access_flags:
        logging.info(f'Digital object of record {record_id} has access type `faro`.' +
                     ' This makes the media resource unavailable.')
        return 'faro'
    elif 'onsite' in access_flags:
        logging.info(f'Digital object of record {record_id} has access type `onsite`.' +
                     ' This makes the media resource unavailable.')
        return 'onsite'
    elif 'noonsite' in access_flags:
        logging.info(f'Digital object of record {record_id} has access type `noonsite`.' +
                     ' This makes the media resource unavailable.')
        return 'noonsite'
    else:
        logging.info(f'Digital object of record {record_id} has no or invalid access information!'
                     + ' The media resource is therefore unavailable')
        return 'unavailable'
249
250


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
251
def _get_record_id(graph) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
252
253
254
    """
    Get record identifier
    """
255
    for resource in graph:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
256
257
        if '@type' in resource and resource['@type'] == \
                'https://www.ica.org/standards/RiC/ontology#Record':
258
259
260
            return resource['@id'] if '@id' in resource else None


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
261
def _has_audio_snippet(record) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
262
263
264
265
    """
    Record has an attached audio snippet (created by external service)
    """
    return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://')
266
267


268
def _normalize_dimension(dimension) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
269
270
271
    """
    Cast dimension to int
    """
272
    return round(float(dimension))
273
274


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
275
def _normalize_duration(duration) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
276
277
278
    """
    Normalise different representation of duration
    """
279
    if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
280
281
        split = duration.split(':')
        return int(split[0]) * 60 + int(split[1])
282
    elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
283
284
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
285
    elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
286
287
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
288
    elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
289
        return int(duration.split('.')[0])
290
291
    elif re.fullmatch(r'\d+', str(duration), re.ASCII):
        return int(duration)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
292
293
294
295
296
    else:
        logging.warning(f'Can\'t parse duration `{duration}`')
        return 0


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
297
298
299
300
301
def _is_digital_object(resource) -> bool:
    """
    Resource is of type `digital object`
    """
    return 'type' in resource and resource['type'] == 'digitalObject'
302
303


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
304
305
306
307
308
def _is_thumbnail(resource) -> bool:
    """
    Resource is of type `thumbnail`
    """
    return 'type' in resource and resource['type'] == 'thumbnail'
309

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
310
311
312
313
314
315

def _is_playable(access_status) -> bool:
    """
    Digital object can be retrieved via link
    """
    return access_status == 'public' or access_status == 'closed'
316
317


318
class MediametadataToDB:
319
    def run(self):
Matthias's avatar
Matthias committed
320
321
        """
        Import media metadata to mariaDB
322
        This service should not return anything but run forever.
Matthias's avatar
Matthias committed
323
324
        """

325
        consumer = _connect_to_kafka()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
326
        record_processor = RecordProcessor()
327
        counter = 0
328
        try:
329
            while True:
330
                consumer.poll()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
331
                for record_object in consumer:
332
                    counter += 1
333
                    record = record_object.value['@graph']
334
                    headers = record_object.headers
335
                    record_id = _get_record_id(record)
336
                    logging.debug(f'Processing record {record_id}')
337
                    record_processor.new_record(record_id, headers)
338
                    access_status = _get_access_status(record, record_id)
339
                    for record_resource in record:
340
341
                        if _is_digital_object(record_resource):
                            if _is_playable(access_status):
342
                                enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
343
344
                                                                       _extract_dig_obj_vals,
                                                                       access_status)
345
346
347
                                if enriched_data:
                                    record_processor.digital_object_ok(record_id, enriched_data)
                                    if _has_audio_snippet(enriched_data):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
348
349
                                        snippet_entry = _create_audio_snippet_entry(enriched_data,
                                                                                    access_status)
350
351
352
353
354
355
                                        if snippet_entry:
                                            record_processor.audio_snippet_ok(record_id,
                                                                              snippet_entry)
                                        else:
                                            record_processor.audio_snippet_fail(record_id)
                                else:
356
                                    record_processor.digital_object_fail(record_id, error)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
357
                            else:
358
359
360
361
                                record_processor.digital_object_ignore(record_id,
                                                                       f"Ignored because of"
                                                                       f" access status "
                                                                       f"{access_status}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
362
                        elif _is_thumbnail(record_resource):
363
                            enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
364
365
                                                                   _extract_thumbnail_values,
                                                                   access_status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
366
367
                            if enriched_data:
                                record_processor.thumbnail_ok(record_id, enriched_data)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
368
                            else:
369
                                record_processor.thumbnail_fail(record_id, error)
370
371
                    record_processor.index()
                    consumer.commit()
372
373
                    if counter % 1000 == 0:
                        logging.info('{} messages read till now'.format(counter))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
374
                # arriving here means there are no new messages to poll from
375
376
                record_processor.index()
                consumer.commit()
377
378
        except Exception as ex:
            status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
Matthias's avatar
Matthias committed
379
            logging.error(status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
380
            record_processor.abort(ex)
381

Matthias's avatar
Matthias committed
382
383
384
385
386
387
    def __init__(self):
        # TODO : maybe take that to a configuration (development vs pod running in
        # k8s cluster)
        try:
            # to be used when inside a kubernetes cluster
            config.load_incluster_config()
388
            logging.info("Loading incluster config")
389
        except K8sConfigException:
Matthias's avatar
Matthias committed
390
391
392
393
            try:
                # use .kube directory
                # for local development
                config.load_kube_config()
394
                logging.info("Loading kube config (for local development)")
395
            except K8sConfigException:
Matthias's avatar
Matthias committed
396
                logging.error("No kubernetes cluster defined")