MediametadataToDB.py 15.2 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#  mediametadatatodb
#  Copyright (C) 2020  Memoriav
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the GNU Affero General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

Matthias's avatar
Matthias committed
17
import json
18
import logging
Matthias's avatar
Matthias committed
19
import os
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
20
import re
21
import time
22

23
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
24
from kafka import KafkaConsumer
25
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
26
from kafka.errors import KafkaError
Matthias's avatar
Matthias committed
27
from kubernetes import config
28
29
from kubernetes.config.config_exception import ConfigException as K8sConfigException

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30
from mediametadatatodb_app.resources.processor import RecordProcessor
31

32
33

def _connect_to_kafka(retries=0):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34
35
36
    """
    Connect to Kafka. Abort after configured retries.
    """
37
38
    try:
        consumer = KafkaConsumer(
39
            os.environ['INPUT_TOPIC'],
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
            value_deserializer=lambda m: json.loads(m.decode('utf8')),
            bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=os.environ['KAFKA_GROUP_ID'],
            consumer_timeout_ms=30000)
        return consumer
    except KafkaError as ex:
        status = 'KafkaError: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63
64
def _extract_fields(record_json_data,
                    fetch_from_obj_fun,
65
                    access_status) -> (dict, str):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
66
67
68
    """
    Extract fields from JSON object by applying `fetch_from_obj_fun` function
    """
69
    if 'locator' in record_json_data and '@id' in record_json_data:
70
        return fetch_from_obj_fun(record_json_data, access_status), None
71
72
    elif '@id' in record_json_data:
        logging.info('Record ' + record_json_data['@id'] +
73
74
                     ' does not have a locator property.')
        return dict(), 'No locator property found'
75
76
    else:
        logging.warning('Record without @id-property detected!')
77
        return dict(), 'No @id property found'
78
79


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
80
81
82
83
def _extract_thumbnail_values(msg, _access_status) -> dict:
    """
    Extract information on thumbnail from JSON object
    """
84
85
    return_values = {'type': 'image', 'access': 'public', 'proto': 'file',
                     'sig': '{}-poster'.format(msg['@id'].split('/')[-2])}
86
    if 'height' in msg:
87
        height = _normalize_dimension(msg['height'])
88
89
        return_values['height'] = height
    if 'width' in msg:
90
        width = _normalize_dimension(msg['width'])
91
        return_values['width'] = width
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
    if 'hasMimeType' in msg:
        return_values['mimetype'] = msg['hasMimeType']
        if return_values['mimetype'] == 'image/jpeg':
            file_extension = 'jpg'
        elif return_values['mimetype'] == 'image/png':
            file_extension = 'png'
        elif return_values['mimetype'] == 'image/jp2':
            file_extension = 'jp2'
        else:
            file_extension = ''
            logging.warning('No valid mimetype found!')
    else:
        file_extension = ''
        logging.warning('No valid mimetype found!')
    return_values['uri'] = 'file:///data/{}-poster.{}'.\
        format(msg['@id'].split('/')[-2], file_extension)
108
109
110
    return return_values


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
111
def _extract_dig_obj_vals(msg, access_status) -> dict:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112
113
114
    """
    Extract information on digital object from JSON object
    """
115
116
    if 'isDistributedOn' not in msg:
        logging.warning("No isDistributedOn property found in object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
117
        return dict()
118
    file_extension = ''
119
    return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]}
120
    if 'height' in msg:
121
        height = _normalize_dimension(msg['height'])
122
        return_values['height'] = height
123
    if 'width' in msg:
124
        width = _normalize_dimension(msg['width'])
125
        return_values['width'] = width
126
    if 'duration' in msg:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
127
        duration = _normalize_duration(msg['duration'])
128
        return_values['duration'] = duration
129
    return_values['type'] = msg['isDistributedOn']
130
    if 'hasMimeType' in msg:
131
        mimetype = msg['hasMimeType']
132
        return_values['mimetype'] = mimetype
133
134
    if _is_remote_file(msg):
        return_values['uri'] = msg['locator']
135
136
137
        if access_status == 'public' and not return_values['type'] == 'image':
            # Remote images are always accessed via proxy because their respective
            # route goes over the internal image server
138
            return_values['proto'] = 'redirect'
139
        else:
140
141
142
143
            return_values['proto'] = 'proxy'
    else:
        return_values['proto'] = 'file'
        if return_values['type'] == 'image':
144
145
146
147
148
149
150
151
152
153
154
155
156
            if return_values.get('mimetype'):
                if return_values['mimetype'] == 'image/jpeg':
                    file_extension = 'jpg'
                elif return_values['mimetype'] == 'image/png':
                    file_extension = 'png'
                elif return_values['mimetype'] == 'image/jp2':
                    file_extension = 'jp2'
                else:
                    file_extension = ''
                    logging.warning('No valid mimetype found!')
            else:
                file_extension = ''
                logging.warning('No valid mimetype found!')
157
158
159
160
161
        if return_values['type'] == 'audio':
            file_extension = 'mp4'
        if return_values['type'] == 'video':
            file_extension = 'mp4'
        return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
162
    return return_values
163
164


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def _create_audio_snippet_entry(record) -> dict:
    """
    Create an audio snippet entry based on the digital object
    """
    snippet_record = record.copy()
    if 'duration' not in snippet_record:
        logging.warning("No duration for audio found: Setting duration to 0")
        snippet_record['duration'] = 0
    snippet_record['sig'] = snippet_record['sig'] + '-intro'
    snippet_record['access'] = 'public'
    # //@formatter:off
    snippet_record['duration'] = \
        30 if _normalize_duration(snippet_record['duration']) >= 30 \
        else _normalize_duration(snippet_record['duration'])
    # //@formatter:on
    snippet_record['mimetype'] = 'audio/mpeg'
    snippet_record['uri'] = \
        '.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
    return snippet_record


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
186
def _is_remote_file(msg) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
187
188
189
    """
    Media file is saved on a remote system
    """
190
    return 'locator' in msg and not \
191
        msg['locator'].startswith('sftp:/')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
192
193


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194
def _is_directly_fetchable(digital_object_resource) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
195
196
197
198
    """
    Media file is directly accessible (i.e. is delivered via Memobase player)
    """
    # //@formatter:off
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
199
    return digital_object_resource['isDistributedOn'] == 'audio' or \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
200
201
202
        digital_object_resource['isDistributedOn'] == 'image' or \
        digital_object_resource['isDistributedOn'] == 'video'
    # //@formatter:on
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
203
204


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
205
def _get_access_status(graph, record_id) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
206
    """
207
208
    Decide on access status. Possible values are `public`, `closed`, `unavailable`,
    `faro`, `onsite`, or `noonsite`
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
209
    """
210
211
212
    for resource in graph:
        if 'type' in resource and resource['type'] == 'access' and \
                'regulates' in resource and \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
213
                resource['regulates'].startswith('https://memobase.ch/digital/') and \
214
                'name' in resource:
215
            if resource['name'] == 'public':
216
                logging.debug(f'{record_id}: Setting access for digital object to `public`')
217
218
                return 'public'
            elif resource['name'] == 'private':
219
                logging.debug(f'{record_id}: Setting access for digital object to `closed`')
220
                return 'closed'
221
222
223
            else:
                logging.info(f'Digital object of record {record_id} has access type ' +
                             f'`{resource["name"]}`. This makes the media resource unavailable.')
224
                return resource['name']
225
226
    logging.info(f'Digital object of record {record_id} has no related access information!'
                 + ' Media resource is therefore unavailable')
227
228
229
    return 'unavailable'


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
230
def _get_record_id(graph) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
231
232
233
    """
    Get record identifier
    """
234
    for resource in graph:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
235
236
        if '@type' in resource and resource['@type'] == \
                'https://www.ica.org/standards/RiC/ontology#Record':
237
238
239
            return resource['@id'] if '@id' in resource else None


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
240
def _has_audio_snippet(record) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
241
242
243
244
    """
    Record has an attached audio snippet (created by external service)
    """
    return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://')
245
246


247
def _normalize_dimension(dimension) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
248
249
250
    """
    Cast dimension to int
    """
251
    return round(float(dimension))
252
253


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
254
def _normalize_duration(duration) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
255
256
257
    """
    Normalise different representation of duration
    """
258
    if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
259
260
        split = duration.split(':')
        return int(split[0]) * 60 + int(split[1])
261
    elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
262
263
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
264
    elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
265
266
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
267
    elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
268
        return int(duration.split('.')[0])
269
270
    elif re.fullmatch(r'\d+', str(duration), re.ASCII):
        return int(duration)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
271
272
273
274
275
    else:
        logging.warning(f'Can\'t parse duration `{duration}`')
        return 0


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
276
277
278
279
280
def _is_digital_object(resource) -> bool:
    """
    Resource is of type `digital object`
    """
    return 'type' in resource and resource['type'] == 'digitalObject'
281
282


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
283
284
285
286
287
def _is_thumbnail(resource) -> bool:
    """
    Resource is of type `thumbnail`
    """
    return 'type' in resource and resource['type'] == 'thumbnail'
288

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
289
290
291
292
293
294

def _is_playable(access_status) -> bool:
    """
    Digital object can be retrieved via link
    """
    return access_status == 'public' or access_status == 'closed'
295
296


297
298
299
300
301
302
303
304
305
306
307
def _get_institution_and_record_set(record) -> (str, str):
    institution = ''
    record_set = ''
    for header in record.headers:
        if header[0] == 'institutionId':
            institution = header[1]
        if header[0] == 'recordSetId':
            record_set = header[1]
    return institution, record_set


308
class MediametadataToDB:
309
    def run(self):
Matthias's avatar
Matthias committed
310
311
        """
        Import media metadata to mariaDB
312
        This service should not return anything but run forever.
Matthias's avatar
Matthias committed
313
314
        """

315
        consumer = _connect_to_kafka()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
316
        record_processor = RecordProcessor()
317
        counter = 0
318
        try:
319
            while True:
320
                consumer.poll()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
321
                for record_object in consumer:
322
                    counter += 1
323
                    institution, record_set = _get_institution_and_record_set(record_object)
324
325
                    record = record_object.value['@graph']
                    record_id = _get_record_id(record)
326
                    logging.debug(f'Processing record {record_id}')
327
                    record_processor.new_record(record_id, institution, record_set)
328
                    access_status = _get_access_status(record, record_id)
329
                    for record_resource in record:
330
331
                        if _is_digital_object(record_resource):
                            if _is_playable(access_status):
332
                                enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
333
334
                                                                       _extract_dig_obj_vals,
                                                                       access_status)
335
336
337
338
339
340
341
342
343
344
                                if enriched_data:
                                    record_processor.digital_object_ok(record_id, enriched_data)
                                    if _has_audio_snippet(enriched_data):
                                        snippet_entry = _create_audio_snippet_entry(enriched_data)
                                        if snippet_entry:
                                            record_processor.audio_snippet_ok(record_id,
                                                                              snippet_entry)
                                        else:
                                            record_processor.audio_snippet_fail(record_id)
                                else:
345
                                    record_processor.digital_object_fail(record_id, error)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
346
                            else:
347
348
349
350
                                record_processor.digital_object_ignore(record_id,
                                                                       f"Ignored because of"
                                                                       f" access status "
                                                                       f"{access_status}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
351
                        elif _is_thumbnail(record_resource):
352
                            enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
353
354
                                                                   _extract_thumbnail_values,
                                                                   access_status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
355
356
                            if enriched_data:
                                record_processor.thumbnail_ok(record_id, enriched_data)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
357
                            else:
358
                                record_processor.thumbnail_fail(record_id, error)
359
360
                    record_processor.index()
                    consumer.commit()
361
362
                    if counter % 1000 == 0:
                        logging.info('{} messages read till now'.format(counter))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
363
                # arriving here means there are no new messages to poll from
364
365
                record_processor.index()
                consumer.commit()
366
367
        except Exception as ex:
            status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
Matthias's avatar
Matthias committed
368
            logging.error(status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
369
            record_processor.abort(ex)
370

Matthias's avatar
Matthias committed
371
372
373
374
375
376
    def __init__(self):
        # TODO : maybe take that to a configuration (development vs pod running in
        # k8s cluster)
        try:
            # to be used when inside a kubernetes cluster
            config.load_incluster_config()
377
            logging.info("Loading incluster config")
378
        except K8sConfigException:
Matthias's avatar
Matthias committed
379
380
381
382
            try:
                # use .kube directory
                # for local development
                config.load_kube_config()
383
                logging.info("Loading kube config (for local development)")
384
            except K8sConfigException:
Matthias's avatar
Matthias committed
385
                logging.error("No kubernetes cluster defined")