# mediametadatatodb # Copyright (C) 2020 Memoriav # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import logging import os import re import time # noinspection PyPackageRequirements from kafka import KafkaConsumer # noinspection PyPackageRequirements from kafka.errors import KafkaError from kubernetes import config from kubernetes.config.config_exception import ConfigException as K8sConfigException from mediametadatatodb_app.resources.processor import RecordProcessor def _connect_to_kafka(retries=0): """ Connect to Kafka. Abort after configured retries. """ try: consumer = KafkaConsumer( os.environ['INPUT_TOPIC'], value_deserializer=lambda m: json.loads(m.decode('utf8')), bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'], auto_offset_reset='earliest', enable_auto_commit=False, group_id=os.environ['KAFKA_GROUP_ID'], consumer_timeout_ms=30000) return consumer except KafkaError as ex: status = 'KafkaError: ' + str(ex) logging.error(status) if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']): time.sleep(30 * (retries + 1)) _connect_to_kafka(retries + 1) exit(1) except Exception as ex: status = 'Exception: ' + str(ex) logging.error(status) if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']): time.sleep(30 * (retries + 1)) _connect_to_kafka(retries + 1) exit(1) def _extract_fields(record_json_data, fetch_from_obj_fun, access_status) -> (dict, str): """ Extract fields from JSON object by applying `fetch_from_obj_fun` function """ if 'locator' in record_json_data and '@id' in record_json_data: return fetch_from_obj_fun(record_json_data, access_status), None elif '@id' in record_json_data: logging.info('Record ' + record_json_data['@id'] + ' does not have a locator property.') return dict(), 'No locator property found' else: logging.warning('Record without @id-property detected!') return dict(), 'No @id property found' def _extract_thumbnail_values(msg, _access_status) -> dict: """ Extract information on thumbnail from JSON object """ return_values = {'type': 'image', 'access': 'public', 'proto': 'file', 'sig': '{}-poster'.format(msg['@id'].split('/')[-2])} if 'height' in msg: height = _normalize_dimension(msg['height']) return_values['height'] = height if 'width' in msg: width = _normalize_dimension(msg['width']) return_values['width'] = width if 'hasMimeType' in msg: return_values['mimetype'] = msg['hasMimeType'] if return_values['mimetype'] == 'image/jpeg': file_extension = 'jpg' elif return_values['mimetype'] == 'image/png': file_extension = 'png' elif return_values['mimetype'] == 'image/jp2': file_extension = 'jp2' else: file_extension = '' logging.warning('No valid mimetype found!') else: file_extension = '' logging.warning('No valid mimetype found!') return_values['uri'] = 'file:///data/{}-poster.{}'. \ format(msg['@id'].split('/')[-2], file_extension) return return_values def _extract_dig_obj_vals(msg, access_status) -> dict: """ Extract information on digital object from JSON object """ if 'isDistributedOn' not in msg: logging.warning("No isDistributedOn property found in object") return dict() file_extension = '' return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]} if 'height' in msg: height = _normalize_dimension(msg['height']) return_values['height'] = height if 'width' in msg: width = _normalize_dimension(msg['width']) return_values['width'] = width if 'duration' in msg: duration = _normalize_duration(msg['duration']) return_values['duration'] = duration return_values['type'] = msg['isDistributedOn'] if 'hasMimeType' in msg: mimetype = msg['hasMimeType'] return_values['mimetype'] = mimetype if _is_remote_file(msg): return_values['uri'] = msg['locator'] if access_status == 'public' and not return_values['type'] == 'image': # Remote images are always accessed via proxy because their respective # route goes over the internal image server return_values['proto'] = 'redirect' else: return_values['proto'] = 'proxydirect' else: return_values['proto'] = 'file' if return_values['type'] == 'image': if return_values.get('mimetype'): if return_values['mimetype'] == 'image/jpeg': file_extension = 'jpg' elif return_values['mimetype'] == 'image/png': file_extension = 'png' elif return_values['mimetype'] == 'image/jp2': file_extension = 'jp2' else: file_extension = '' logging.warning('No valid mimetype found!') else: file_extension = '' logging.warning('No valid mimetype found!') if return_values['type'] == 'audio': file_extension = 'mp4' if return_values['type'] == 'video': file_extension = 'mp4' return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension return return_values def _create_audio_snippet_entry(record) -> dict: """ Create an audio snippet entry based on the digital object """ snippet_record = record.copy() if 'duration' not in snippet_record: logging.warning("No duration for audio found: Setting duration to 0") snippet_record['duration'] = 0 snippet_record['sig'] = snippet_record['sig'] + '-intro' snippet_record['access'] = 'public' # //@formatter:off snippet_record['duration'] = \ 30 if _normalize_duration(snippet_record['duration']) >= 30 \ else _normalize_duration(snippet_record['duration']) # //@formatter:on snippet_record['mimetype'] = 'audio/mpeg' snippet_record['uri'] = \ '.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3' return snippet_record def _is_remote_file(msg) -> bool: """ Media file is saved on a remote system """ return 'locator' in msg and not \ msg['locator'].startswith('sftp:/') def _is_directly_fetchable(digital_object_resource) -> bool: """ Media file is directly accessible (i.e. is delivered via Memobase player) """ # //@formatter:off return digital_object_resource['isDistributedOn'] == 'audio' or \ digital_object_resource['isDistributedOn'] == 'image' or \ digital_object_resource['isDistributedOn'] == 'video' # //@formatter:on def _get_access_status(graph, record_id) -> str: """ Decide on access status. Possible values are `public`, `closed`, `unavailable`, `faro`, `onsite`, or `noonsite` """ # Since several access flags in one document are possible, we have to apply a ranking # which is `closed`, `public`, `faro`, `onsite`, `noonsite` and as fallback `unavailable` access_flags = list() for resource in graph: if 'type' in resource and resource['type'] == 'access' and \ 'regulates' in resource and \ resource['regulates'].startswith('https://memobase.ch/digital/') and \ 'name' in resource: if resource['name'] == 'public': access_flags.append('public') elif resource['name'] == 'private': logging.debug(f'{record_id}: Setting access for digital object to `closed`') access_flags.append('closed') else: logging.info(f'Digital object of record {record_id} has access type ' + f'`{resource["name"]}`. This makes the media resource unavailable.') access_flags.append(resource['name']) if 'closed' in access_flags: logging.debug(f'{record_id}: Setting access for digital object to `closed`') return 'closed' elif 'public' in access_flags: logging.debug(f'{record_id}: Setting access for digital object to `public`') return 'public' elif 'faro' in access_flags: logging.info(f'Digital object of record {record_id} has access type `faro`.' + ' This makes the media resource unavailable.') return 'faro' elif 'onsite' in access_flags: logging.info(f'Digital object of record {record_id} has access type `onsite`.' + ' This makes the media resource unavailable.') return 'onsite' elif 'noonsite' in access_flags: logging.info(f'Digital object of record {record_id} has access type `noonsite`.' + ' This makes the media resource unavailable.') return 'noonsite' else: logging.info(f'Digital object of record {record_id} has no or invalid access information!' + ' The media resource is therefore unavailable') return 'unavailable' def _get_record_id(graph) -> str: """ Get record identifier """ for resource in graph: if '@type' in resource and resource['@type'] == \ 'https://www.ica.org/standards/RiC/ontology#Record': return resource['@id'] if '@id' in resource else None def _has_audio_snippet(record) -> bool: """ Record has an attached audio snippet (created by external service) """ return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://') def _normalize_dimension(dimension) -> int: """ Cast dimension to int """ return round(float(dimension)) def _normalize_duration(duration) -> int: """ Normalise different representation of duration """ if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII): split = duration.split(':') return int(split[0]) * 60 + int(split[1]) elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII): split = duration.split(':') return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2]) elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII): split = duration.split(':') return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2]) elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII): return int(duration.split('.')[0]) elif re.fullmatch(r'\d+', str(duration), re.ASCII): return int(duration) else: logging.warning(f'Can\'t parse duration `{duration}`') return 0 def _is_digital_object(resource) -> bool: """ Resource is of type `digital object` """ return 'type' in resource and resource['type'] == 'digitalObject' def _is_thumbnail(resource) -> bool: """ Resource is of type `thumbnail` """ return 'type' in resource and resource['type'] == 'thumbnail' def _is_playable(access_status) -> bool: """ Digital object can be retrieved via link """ return access_status == 'public' or access_status == 'closed' class MediametadataToDB: def run(self): """ Import media metadata to mariaDB This service should not return anything but run forever. """ consumer = _connect_to_kafka() record_processor = RecordProcessor() counter = 0 try: while True: consumer.poll() for record_object in consumer: counter += 1 record = record_object.value['@graph'] headers = record_object.headers record_id = _get_record_id(record) logging.debug(f'Processing record {record_id}') record_processor.new_record(record_id, headers) access_status = _get_access_status(record, record_id) for record_resource in record: if _is_digital_object(record_resource): if _is_playable(access_status): enriched_data, error = _extract_fields(record_resource, _extract_dig_obj_vals, access_status) if enriched_data: record_processor.digital_object_ok(record_id, enriched_data) if _has_audio_snippet(enriched_data): snippet_entry = _create_audio_snippet_entry(enriched_data) if snippet_entry: record_processor.audio_snippet_ok(record_id, snippet_entry) else: record_processor.audio_snippet_fail(record_id) else: record_processor.digital_object_fail(record_id, error) else: record_processor.digital_object_ignore(record_id, f"Ignored because of" f" access status " f"{access_status}") elif _is_thumbnail(record_resource): enriched_data, error = _extract_fields(record_resource, _extract_thumbnail_values, access_status) if enriched_data: record_processor.thumbnail_ok(record_id, enriched_data) else: record_processor.thumbnail_fail(record_id, error) record_processor.index() consumer.commit() if counter % 1000 == 0: logging.info('{} messages read till now'.format(counter)) # arriving here means there are no new messages to poll from record_processor.index() consumer.commit() except Exception as ex: status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex) logging.error(status) record_processor.abort(ex) def __init__(self): # TODO : maybe take that to a configuration (development vs pod running in # k8s cluster) try: # to be used when inside a kubernetes cluster config.load_incluster_config() logging.info("Loading incluster config") except K8sConfigException: try: # use .kube directory # for local development config.load_kube_config() logging.info("Loading kube config (for local development)") except K8sConfigException: logging.error("No kubernetes cluster defined")