MediametadataToDB.py 14.9 KB
Newer Older
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#  mediametadatatodb
#  Copyright (C) 2020  Memoriav
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the GNU Affero General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

Matthias's avatar
Matthias committed
17
import json
18
import logging
Matthias's avatar
Matthias committed
19
import os
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
20
import re
21
import time
22

23
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
24
from kafka import KafkaConsumer
25
# noinspection PyPackageRequirements
Matthias's avatar
Matthias committed
26
from kafka.errors import KafkaError
Matthias's avatar
Matthias committed
27
from kubernetes import config
28 29
from kubernetes.config.config_exception import ConfigException as K8sConfigException

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
30
from mediametadatatodb_app.resources.processor import RecordProcessor
31

32 33

def _connect_to_kafka(retries=0):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
34 35 36
    """
    Connect to Kafka. Abort after configured retries.
    """
37 38
    try:
        consumer = KafkaConsumer(
39
            os.environ['INPUT_TOPIC'],
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
            value_deserializer=lambda m: json.loads(m.decode('utf8')),
            bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=os.environ['KAFKA_GROUP_ID'],
            consumer_timeout_ms=30000)
        return consumer
    except KafkaError as ex:
        status = 'KafkaError: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)
    except Exception as ex:
        status = 'Exception: ' + str(ex)
        logging.error(status)
        if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
            time.sleep(30 * (retries + 1))
            _connect_to_kafka(retries + 1)
        exit(1)


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
63 64
def _extract_fields(record_json_data,
                    fetch_from_obj_fun,
65
                    access_status) -> (dict, str):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
66 67 68
    """
    Extract fields from JSON object by applying `fetch_from_obj_fun` function
    """
69
    if 'locator' in record_json_data and '@id' in record_json_data:
70
        return fetch_from_obj_fun(record_json_data, access_status), None
71 72
    elif '@id' in record_json_data:
        logging.info('Record ' + record_json_data['@id'] +
73 74
                     ' does not have a locator property.')
        return dict(), 'No locator property found'
75 76
    else:
        logging.warning('Record without @id-property detected!')
77
        return dict(), 'No @id property found'
78 79


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
80 81 82 83
def _extract_thumbnail_values(msg, _access_status) -> dict:
    """
    Extract information on thumbnail from JSON object
    """
84 85
    return_values = {'type': 'image', 'access': 'public', 'proto': 'file',
                     'sig': '{}-poster'.format(msg['@id'].split('/')[-2])}
86
    if 'height' in msg:
87
        height = _normalize_dimension(msg['height'])
88 89
        return_values['height'] = height
    if 'width' in msg:
90
        width = _normalize_dimension(msg['width'])
91
        return_values['width'] = width
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    if 'hasMimeType' in msg:
        return_values['mimetype'] = msg['hasMimeType']
        if return_values['mimetype'] == 'image/jpeg':
            file_extension = 'jpg'
        elif return_values['mimetype'] == 'image/png':
            file_extension = 'png'
        elif return_values['mimetype'] == 'image/jp2':
            file_extension = 'jp2'
        else:
            file_extension = ''
            logging.warning('No valid mimetype found!')
    else:
        file_extension = ''
        logging.warning('No valid mimetype found!')
    return_values['uri'] = 'file:///data/{}-poster.{}'.\
        format(msg['@id'].split('/')[-2], file_extension)
108 109 110
    return return_values


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
111
def _extract_dig_obj_vals(msg, access_status) -> dict:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
112 113 114
    """
    Extract information on digital object from JSON object
    """
115 116
    if 'isDistributedOn' not in msg:
        logging.warning("No isDistributedOn property found in object")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
117
        return dict()
118
    file_extension = ''
119
    return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]}
120
    if 'height' in msg:
121
        height = _normalize_dimension(msg['height'])
122
        return_values['height'] = height
123
    if 'width' in msg:
124
        width = _normalize_dimension(msg['width'])
125
        return_values['width'] = width
126
    if 'duration' in msg:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
127
        duration = _normalize_duration(msg['duration'])
128
        return_values['duration'] = duration
129
    return_values['type'] = msg['isDistributedOn']
130
    if 'hasMimeType' in msg:
131
        mimetype = msg['hasMimeType']
132
        return_values['mimetype'] = mimetype
133 134
    if _is_remote_file(msg):
        return_values['uri'] = msg['locator']
135 136 137
        if access_status == 'public' and not return_values['type'] == 'image':
            # Remote images are always accessed via proxy because their respective
            # route goes over the internal image server
138
            return_values['proto'] = 'redirect'
139
        else:
140 141 142 143
            return_values['proto'] = 'proxy'
    else:
        return_values['proto'] = 'file'
        if return_values['type'] == 'image':
144 145 146 147 148 149 150 151 152 153 154 155 156
            if return_values.get('mimetype'):
                if return_values['mimetype'] == 'image/jpeg':
                    file_extension = 'jpg'
                elif return_values['mimetype'] == 'image/png':
                    file_extension = 'png'
                elif return_values['mimetype'] == 'image/jp2':
                    file_extension = 'jp2'
                else:
                    file_extension = ''
                    logging.warning('No valid mimetype found!')
            else:
                file_extension = ''
                logging.warning('No valid mimetype found!')
157 158 159 160 161
        if return_values['type'] == 'audio':
            file_extension = 'mp4'
        if return_values['type'] == 'video':
            file_extension = 'mp4'
        return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
162
    return return_values
163 164


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
def _create_audio_snippet_entry(record) -> dict:
    """
    Create an audio snippet entry based on the digital object
    """
    snippet_record = record.copy()
    if 'duration' not in snippet_record:
        logging.warning("No duration for audio found: Setting duration to 0")
        snippet_record['duration'] = 0
    snippet_record['sig'] = snippet_record['sig'] + '-intro'
    snippet_record['access'] = 'public'
    # //@formatter:off
    snippet_record['duration'] = \
        30 if _normalize_duration(snippet_record['duration']) >= 30 \
        else _normalize_duration(snippet_record['duration'])
    # //@formatter:on
    snippet_record['mimetype'] = 'audio/mpeg'
    snippet_record['uri'] = \
        '.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
    return snippet_record


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
186
def _is_remote_file(msg) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
187 188 189
    """
    Media file is saved on a remote system
    """
190
    return 'locator' in msg and not \
191
        msg['locator'].startswith('sftp:/')
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
192 193


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
194
def _is_directly_fetchable(digital_object_resource) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
195 196 197 198
    """
    Media file is directly accessible (i.e. is delivered via Memobase player)
    """
    # //@formatter:off
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
199
    return digital_object_resource['isDistributedOn'] == 'audio' or \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
200 201 202
        digital_object_resource['isDistributedOn'] == 'image' or \
        digital_object_resource['isDistributedOn'] == 'video'
    # //@formatter:on
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
203 204


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
205
def _get_access_status(graph, record_id) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
206
    """
207 208
    Decide on access status. Possible values are `public`, `closed`, `unavailable`,
    `faro`, `onsite`, or `noonsite`
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
209
    """
210 211 212
    for resource in graph:
        if 'type' in resource and resource['type'] == 'access' and \
                'regulates' in resource and \
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
213
                resource['regulates'].startswith('https://memobase.ch/digital/') and \
214
                'name' in resource:
215
            if resource['name'] == 'public':
216
                logging.debug(f'{record_id}: Setting access for digital object to `public`')
217 218
                return 'public'
            elif resource['name'] == 'private':
219
                logging.debug(f'{record_id}: Setting access for digital object to `closed`')
220
                return 'closed'
221 222 223
            else:
                logging.info(f'Digital object of record {record_id} has access type ' +
                             f'`{resource["name"]}`. This makes the media resource unavailable.')
224
                return resource['name']
225 226
    logging.info(f'Digital object of record {record_id} has no related access information!'
                 + ' Media resource is therefore unavailable')
227 228 229
    return 'unavailable'


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
230
def _get_record_id(graph) -> str:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
231 232 233
    """
    Get record identifier
    """
234
    for resource in graph:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
235 236
        if '@type' in resource and resource['@type'] == \
                'https://www.ica.org/standards/RiC/ontology#Record':
237 238 239
            return resource['@id'] if '@id' in resource else None


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
240
def _has_audio_snippet(record) -> bool:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
241 242 243 244
    """
    Record has an attached audio snippet (created by external service)
    """
    return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://')
245 246


247
def _normalize_dimension(dimension) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
248 249 250
    """
    Cast dimension to int
    """
251
    return round(float(dimension))
252 253


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
254
def _normalize_duration(duration) -> int:
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
255 256 257
    """
    Normalise different representation of duration
    """
258
    if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
259 260
        split = duration.split(':')
        return int(split[0]) * 60 + int(split[1])
261
    elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
262 263
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
264
    elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
265 266
        split = duration.split(':')
        return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
267
    elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII):
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
268
        return int(duration.split('.')[0])
269 270
    elif re.fullmatch(r'\d+', str(duration), re.ASCII):
        return int(duration)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
271 272 273 274 275
    else:
        logging.warning(f'Can\'t parse duration `{duration}`')
        return 0


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
276 277 278 279 280
def _is_digital_object(resource) -> bool:
    """
    Resource is of type `digital object`
    """
    return 'type' in resource and resource['type'] == 'digitalObject'
281 282


Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
283 284 285 286 287
def _is_thumbnail(resource) -> bool:
    """
    Resource is of type `thumbnail`
    """
    return 'type' in resource and resource['type'] == 'thumbnail'
288

Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
289 290 291 292 293 294

def _is_playable(access_status) -> bool:
    """
    Digital object can be retrieved via link
    """
    return access_status == 'public' or access_status == 'closed'
295 296


297
class MediametadataToDB:
298
    def run(self):
Matthias's avatar
Matthias committed
299 300
        """
        Import media metadata to mariaDB
301
        This service should not return anything but run forever.
Matthias's avatar
Matthias committed
302 303
        """

304
        consumer = _connect_to_kafka()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
305
        record_processor = RecordProcessor()
306
        counter = 0
307
        try:
308
            while True:
309
                consumer.poll()
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
310
                for record_object in consumer:
311
                    counter += 1
312
                    record = record_object.value['@graph']
313
                    headers = record_object.headers
314
                    record_id = _get_record_id(record)
315
                    logging.debug(f'Processing record {record_id}')
316
                    record_processor.new_record(record_id, headers)
317
                    access_status = _get_access_status(record, record_id)
318
                    for record_resource in record:
319 320
                        if _is_digital_object(record_resource):
                            if _is_playable(access_status):
321
                                enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
322 323
                                                                       _extract_dig_obj_vals,
                                                                       access_status)
324 325 326 327 328 329 330 331 332 333
                                if enriched_data:
                                    record_processor.digital_object_ok(record_id, enriched_data)
                                    if _has_audio_snippet(enriched_data):
                                        snippet_entry = _create_audio_snippet_entry(enriched_data)
                                        if snippet_entry:
                                            record_processor.audio_snippet_ok(record_id,
                                                                              snippet_entry)
                                        else:
                                            record_processor.audio_snippet_fail(record_id)
                                else:
334
                                    record_processor.digital_object_fail(record_id, error)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
335
                            else:
336 337 338 339
                                record_processor.digital_object_ignore(record_id,
                                                                       f"Ignored because of"
                                                                       f" access status "
                                                                       f"{access_status}")
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
340
                        elif _is_thumbnail(record_resource):
341
                            enriched_data, error = _extract_fields(record_resource,
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
342 343
                                                                   _extract_thumbnail_values,
                                                                   access_status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
344 345
                            if enriched_data:
                                record_processor.thumbnail_ok(record_id, enriched_data)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
346
                            else:
347
                                record_processor.thumbnail_fail(record_id, error)
348 349
                    record_processor.index()
                    consumer.commit()
350 351
                    if counter % 1000 == 0:
                        logging.info('{} messages read till now'.format(counter))
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
352
                # arriving here means there are no new messages to poll from
353 354
                record_processor.index()
                consumer.commit()
355 356
        except Exception as ex:
            status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
Matthias's avatar
Matthias committed
357
            logging.error(status)
Sebastian Schüpbach's avatar
Sebastian Schüpbach committed
358
            record_processor.abort(ex)
359

Matthias's avatar
Matthias committed
360 361 362 363 364 365
    def __init__(self):
        # TODO : maybe take that to a configuration (development vs pod running in
        # k8s cluster)
        try:
            # to be used when inside a kubernetes cluster
            config.load_incluster_config()
366
            logging.info("Loading incluster config")
367
        except K8sConfigException:
Matthias's avatar
Matthias committed
368 369 370 371
            try:
                # use .kube directory
                # for local development
                config.load_kube_config()
372
                logging.info("Loading kube config (for local development)")
373
            except K8sConfigException:
Matthias's avatar
Matthias committed
374
                logging.error("No kubernetes cluster defined")