Verified Commit 54b88057 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

report fatal error if access flag invalid

parent eaed4418
Pipeline #35395 passed with stages
in 1 minute and 53 seconds
......@@ -23,8 +23,10 @@ if __name__ == "__main__":
numeric_level = getattr(logging, os.getenv('LOG_LEVEL').upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid log level: {os.getenv("LOG_LEVEL")}')
logging.basicConfig(format='%(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
level=numeric_level)
logging.basicConfig(
format='%(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
level=numeric_level,
)
logging.info("Starting up")
runner = MediametadataToDB()
runner.run()
......
......@@ -19,9 +19,11 @@ import logging
import os
import re
import time
from typing import Optional, Tuple
# noinspection PyPackageRequirements
from kafka import KafkaConsumer
# noinspection PyPackageRequirements
from kafka.errors import KafkaError
from kubernetes import config
......@@ -42,7 +44,8 @@ def _connect_to_kafka(retries=0):
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=os.environ['GROUP_ID'],
consumer_timeout_ms=30000)
consumer_timeout_ms=30000,
)
return consumer
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
......@@ -60,17 +63,18 @@ def _connect_to_kafka(retries=0):
exit(1)
def _extract_fields(record_json_data,
fetch_from_obj_fun,
access_status) -> (dict, str):
def _extract_fields(
record_json_data, fetch_from_obj_fun, access_status
) -> Tuple[dict, Optional[str]]:
"""
Extract fields from JSON object by applying `fetch_from_obj_fun` function
"""
if 'locator' in record_json_data and '@id' in record_json_data:
return fetch_from_obj_fun(record_json_data, access_status), None
elif '@id' in record_json_data:
logging.info('Record ' + record_json_data['@id'] +
' does not have a locator property.')
logging.info(
'Record ' + record_json_data['@id'] + ' does not have a locator property.'
)
return dict(), 'No locator property found'
else:
logging.warning('Record without @id-property detected!')
......@@ -81,8 +85,13 @@ def _extract_thumbnail_values(msg, _access_status) -> dict:
"""
Extract information on thumbnail from JSON object
"""
return_values = {'type': 'image', 'access': 'public', 'proto': 'file',
'sig': '{}-poster'.format(msg['@id'].split('/')[-2])}
del _access_status
return_values = {
'type': 'image',
'access': 'public',
'proto': 'file',
'sig': '{}-poster'.format(msg['@id'].split('/')[-2]),
}
if 'height' in msg:
height = _normalize_dimension(msg['height'])
return_values['height'] = height
......@@ -103,8 +112,9 @@ def _extract_thumbnail_values(msg, _access_status) -> dict:
else:
file_extension = ''
logging.warning('No valid mimetype found!')
return_values['uri'] = 'file:///data/{}-poster.{}'. \
format(msg['@id'].split('/')[-2], file_extension)
return_values['uri'] = 'file:///data/{}-poster.{}'.format(
msg['@id'].split('/')[-2], file_extension
)
return return_values
......@@ -161,7 +171,9 @@ def _extract_dig_obj_vals(msg, access_status) -> dict:
file_extension = 'mp4'
if return_values['type'] == 'video':
file_extension = 'mp4'
return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
return_values['uri'] = (
os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
)
return return_values
......@@ -176,13 +188,16 @@ def _create_audio_snippet_entry(record, access_status) -> dict:
snippet_record['sig'] = snippet_record['sig'] + '-intro'
snippet_record['access'] = access_status
# //@formatter:off
snippet_record['duration'] = \
30 if _normalize_duration(snippet_record['duration']) >= 30 \
snippet_record['duration'] = (
30
if _normalize_duration(snippet_record['duration']) >= 30
else _normalize_duration(snippet_record['duration'])
)
# //@formatter:on
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
snippet_record['uri'] = (
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
)
return snippet_record
......@@ -190,19 +205,7 @@ def _is_remote_file(msg) -> bool:
"""
Media file is saved on a remote system
"""
return 'locator' in msg and not \
msg['locator'].startswith('sftp:/')
def _is_directly_fetchable(digital_object_resource) -> bool:
"""
Media file is directly accessible (i.e. is delivered via Memobase player)
"""
# //@formatter:off
return digital_object_resource['isDistributedOn'] == 'audio' or \
digital_object_resource['isDistributedOn'] == 'image' or \
digital_object_resource['isDistributedOn'] == 'video'
# //@formatter:on
return 'locator' in msg and not msg['locator'].startswith('sftp:/')
def _get_access_status(graph, record_id) -> str:
......@@ -214,18 +217,25 @@ def _get_access_status(graph, record_id) -> str:
# which is `closed`, `public`, `faro`, `onsite`, `noonsite` and as fallback `unavailable`
access_flags = list()
for resource in graph:
if 'type' in resource and resource['type'] == 'access' and \
'regulates' in resource and \
resource['regulates'].startswith('https://memobase.ch/digital/') and \
'name' in resource:
if (
'type' in resource
and resource['type'] == 'access'
and 'regulates' in resource
and resource['regulates'].startswith('https://memobase.ch/digital/')
and 'name' in resource
):
if resource['name'] == 'public':
access_flags.append('public')
elif resource['name'] == 'private':
logging.debug(f'{record_id}: Setting access for digital object to `closed`')
logging.debug(
f'{record_id}: Setting access for digital object to `closed`'
)
access_flags.append('closed')
else:
logging.info(f'Digital object of record {record_id} has access type ' +
f'`{resource["name"]}`. This makes the media resource unavailable.')
logging.info(
f'Digital object of record {record_id} has access type '
+ f'`{resource["name"]}`. This makes the media resource unavailable.'
)
access_flags.append(resource['name'])
if 'closed' in access_flags:
logging.debug(f'{record_id}: Setting access for digital object to `closed`')
......@@ -234,30 +244,40 @@ def _get_access_status(graph, record_id) -> str:
logging.debug(f'{record_id}: Setting access for digital object to `public`')
return 'public'
elif 'faro' in access_flags:
logging.info(f'Digital object of record {record_id} has access type `faro`.' +
' This makes the media resource unavailable.')
logging.info(
f'Digital object of record {record_id} has access type `faro`.'
+ ' This makes the media resource unavailable.'
)
return 'faro'
elif 'onsite' in access_flags:
logging.info(f'Digital object of record {record_id} has access type `onsite`.' +
' This makes the media resource unavailable.')
logging.info(
f'Digital object of record {record_id} has access type `onsite`.'
+ ' This makes the media resource unavailable.'
)
return 'onsite'
elif 'noonsite' in access_flags:
logging.info(f'Digital object of record {record_id} has access type `noonsite`.' +
' This makes the media resource unavailable.')
logging.info(
f'Digital object of record {record_id} has access type `noonsite`.'
+ ' This makes the media resource unavailable.'
)
return 'noonsite'
else:
logging.info(f'Digital object of record {record_id} has no or invalid access information!'
+ ' The media resource is therefore unavailable')
logging.warning(
f'Digital object of record {record_id} has no or invalid access information!'
+ ' The media resource is therefore unavailable'
)
return 'unavailable'
def _get_record_id(graph) -> str:
def _get_record_id(graph) -> Optional[str]:
"""
Get record identifier
"""
for resource in graph:
if '@type' in resource and resource['@type'] == \
'https://www.ica.org/standards/RiC/ontology#Record':
if (
'@type' in resource
and resource['@type'] == 'https://www.ica.org/standards/RiC/ontology#Record'
):
return resource['@id'] if '@id' in resource else None
......@@ -265,14 +285,18 @@ def _has_audio_snippet(record) -> bool:
"""
Record has an attached audio snippet (created by external service)
"""
return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://')
return (
record['type'] == 'audio'
and 'uri' in record
and record['uri'].startswith('file://')
)
def _normalize_dimension(dimension) -> int:
def _normalize_dimension(dimension) -> str:
"""
Cast dimension to int
"""
return round(float(dimension))
return str(round(float(dimension)))
def _normalize_duration(duration) -> int:
......@@ -342,30 +366,48 @@ class MediametadataToDB:
for record_resource in record:
if _is_digital_object(record_resource):
if _is_playable(access_status):
enriched_data, error = _extract_fields(record_resource,
_extract_dig_obj_vals,
access_status)
enriched_data, error = _extract_fields(
record_resource,
_extract_dig_obj_vals,
access_status,
)
if enriched_data:
record_processor.digital_object_ok(record_id, enriched_data)
record_processor.digital_object_ok(
record_id, enriched_data
)
if _has_audio_snippet(enriched_data):
snippet_entry = _create_audio_snippet_entry(enriched_data,
access_status)
snippet_entry = _create_audio_snippet_entry(
enriched_data, access_status
)
if snippet_entry:
record_processor.audio_snippet_ok(record_id,
snippet_entry)
record_processor.audio_snippet_ok(
record_id, snippet_entry
)
else:
record_processor.audio_snippet_fail(record_id)
record_processor.audio_snippet_fail(
record_id
)
else:
record_processor.digital_object_fail(record_id, error)
record_processor.digital_object_fail(
record_id, error
)
elif access_status == 'unavailable':
record_processor.digital_object_fail(
record_id, "invalid" " or missing" "access flag"
)
else:
record_processor.digital_object_ignore(record_id,
f"Ignored because of"
f" access status "
f"{access_status}")
record_processor.digital_object_ignore(
record_id,
f"Ignored because of"
f" access status "
f"{access_status}",
)
elif _is_thumbnail(record_resource):
enriched_data, error = _extract_fields(record_resource,
_extract_thumbnail_values,
access_status)
enriched_data, error = _extract_fields(
record_resource,
_extract_thumbnail_values,
access_status,
)
if enriched_data:
record_processor.thumbnail_ok(record_id, enriched_data)
else:
......@@ -378,7 +420,9 @@ class MediametadataToDB:
record_processor.index()
consumer.commit()
except Exception as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
status = (
'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
)
logging.error(status)
record_processor.abort(ex)
......
......@@ -18,13 +18,17 @@ class Indexer:
Connect to MariaDB. Abort after configured retries.
"""
try:
logging.debug(f"Connecting to DB {os.environ['MARIADB_DATABASE']} on "
f"{os.environ['MARIADB_HOST']}:{os.environ['MARIADB_PORT']}")
mariadb_connection = mariadb.connect(user=os.environ['MARIADB_USER'],
password=os.environ['MARIADB_PASSWORD'].rstrip(),
host=os.environ['MARIADB_HOST'],
port=int(os.environ['MARIADB_PORT']),
database=os.environ['MARIADB_DATABASE'])
logging.debug(
f"Connecting to DB {os.environ['MARIADB_DATABASE']} on "
f"{os.environ['MARIADB_HOST']}:{os.environ['MARIADB_PORT']}"
)
mariadb_connection = mariadb.connect(
user=os.environ['MARIADB_USER'],
password=os.environ['MARIADB_PASSWORD'].rstrip(),
host=os.environ['MARIADB_HOST'],
port=int(os.environ['MARIADB_PORT']),
database=os.environ['MARIADB_DATABASE'],
)
mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor()
return mariadb_connection, mariadb_cursor
......@@ -43,28 +47,36 @@ class Indexer:
"""
db_values = [record[f] for f in fields if f in record and record[f]]
db_fields = ','.join([f for f in fields if f in record and record[f]])
db_value_placeholders = ', '.join(['%s' for f in fields if f in record and record[f]])
db_value_placeholders = ', '.join(
['%s' for f in fields if f in record and record[f]]
)
# noinspection SqlNoDataSourceInspection
sql = 'REPLACE INTO {} ({}) VALUES ({})'.format(
table_name, db_fields, db_value_placeholders)
table_name, db_fields, db_value_placeholders
)
return sql, tuple(db_values)
def insert_in_db(self, record) -> (bool, str):
"""
Insert record in DB
"""
stmt, values = Indexer._create_sql_stmt('entities', record,
['sig', 'uri', 'access', 'proto'])
stmt, values = Indexer._create_sql_stmt(
'entities', record, ['sig', 'uri', 'access', 'proto']
)
try:
self.mariadb_cursor.execute(stmt, values)
stmt, values = Indexer._create_sql_stmt('metadata', record,
['sig', 'mimetype', 'height',
'width', 'duration', 'type'])
stmt, values = Indexer._create_sql_stmt(
'metadata',
record,
['sig', 'mimetype', 'height', 'width', 'duration', 'type'],
)
self.mariadb_cursor.execute(stmt, values)
return True, ""
except mariadb.Error as ex:
logging.error(f'Problems in sql statement (statement: "{stmt}", '
f'parameters: {values}): {ex}')
logging.error(
f'Problems in sql statement (statement: "{stmt}", '
f'parameters: {values}): {ex}'
)
return False, str(ex)
def commit(self):
......
......@@ -13,9 +13,11 @@ class RecordProcessor:
@staticmethod
def _parsing_errors(record) -> bool:
return RecordProcessor._parsing_failed_digital_object(record) or \
RecordProcessor._parsing_failed_thumbnail(record) or \
RecordProcessor._parsing_failed_audio_snippet(record)
return (
RecordProcessor._parsing_failed_digital_object(record)
or RecordProcessor._parsing_failed_thumbnail(record)
or RecordProcessor._parsing_failed_audio_snippet(record)
)
@staticmethod
def _parsing_failed_digital_object(record) -> bool:
......@@ -35,113 +37,149 @@ class RecordProcessor:
def digital_object_ok(self, rec_id, data):
logging.debug(f"Parsing of digital object resource for {rec_id} successful")
self.counter += 1
self.processed_records[rec_id]['digital_object'] = \
{'data': data,
'ok': True,
'ignored': False,
'msg': 'successful'}
self.processed_records[rec_id]['digital_object'] = {
'data': data,
'ok': True,
'ignored': False,
'msg': 'successful',
}
def digital_object_fail(self, rec_id, err):
logging.warning(f"Parsing of digital object resource for {rec_id} failed")
self.processed_records[rec_id]['digital_object'] = \
{'data': None,
'ok': False,
'ignored': False,
'msg': 'parsing failed' + f': {err}' if err else ''}
self.processed_records[rec_id]['digital_object'] = {
'data': None,
'ok': False,
'ignored': False,
'msg': 'parsing failed' + f': {err}' if err else '',
}
def digital_object_ignore(self, rec_id, message):
logging.info(f"Digital object resource for {rec_id} ignored")
self.processed_records[rec_id]['digital_object'] = \
{'data': None,
'ok': True,
'ignored': True,
'msg': message}
self.processed_records[rec_id]['digital_object'] = {
'data': None,
'ok': True,
'ignored': True,
'msg': message,
}
def thumbnail_ok(self, rec_id, data):
logging.debug(f"Parsing of thumbnail resource for {rec_id} successful")
self.counter += 1
self.processed_records[rec_id]['thumbnail'] = \
{'data': data,
'ok': True,
'ignored': False,
'msg': 'successful'}
self.processed_records[rec_id]['thumbnail'] = {
'data': data,
'ok': True,
'ignored': False,
'msg': 'successful',
}
def thumbnail_fail(self, rec_id, err):
logging.warning(f"Parsing of thumbnail resource for {rec_id} failed")
self.processed_records[rec_id]['thumbnail'] = \
{'data': None,
'ok': False,
'ignored': False,
'msg': 'parsing failed' + f': {err}' if err else ''}
self.processed_records[rec_id]['thumbnail'] = {
'data': None,
'ok': False,
'ignored': False,
'msg': 'parsing failed' + f': {err}' if err else '',
}
def audio_snippet_ok(self, rec_id, data):
logging.debug(f"Parsing of audio snippet resource for {rec_id} successful")
self.counter += 1
self.processed_records[rec_id]['audio_snippet'] = \
{'data': data,
'ok': True,
'ignored': False,
'msg': 'successful'
}
self.processed_records[rec_id]['audio_snippet'] = {
'data': data,
'ok': True,
'ignored': False,
'msg': 'successful',
}
def audio_snippet_fail(self, rec_id):
logging.warning(f"Parsing of audio snippet resource for {rec_id} failed")
self.processed_records[rec_id]['audio_snippet'] = \
{'data': None,
'ok': False,
'ignored': False,
'msg': 'parsing failed'}
self.processed_records[rec_id]['audio_snippet'] = {
'data': None,
'ok': False,
'ignored': False,
'msg': 'parsing failed',
}
def abort(self, ex):
logging.error("Indexing failed. Aborting...")
for key in self.processed_records.keys():
self.reporter.send_message(key, 'FATAL', f'Indexing failed: {ex}',
self.processed_records[key]['headers'])
self.reporter.send_message(
key,
'FATAL',
f'Indexing failed: {ex}',
self.processed_records[key]['headers'],
)
def index(self):
for key in self.processed_records.keys():
record = self.processed_records[key]
dig_obj_msg = \
record['digital_object']['msg'] if 'digital_object' in record else 'not available'
thumbnail_msg = \
dig_obj_msg = (
record['digital_object']['msg']
if 'digital_object' in record
else 'not available'
)
thumbnail_msg = (
record['thumbnail']['msg'] if 'thumbnail' in record else 'not available'
audio_snip_msg = \
record['audio_snippet']['msg'] if 'audio_snippet' in record else 'not available'
)
audio_snip_msg = (
record['audio_snippet']['msg']
if 'audio_snippet' in record
else 'not available'
)
ok = True
ignored = True
err_msg = ""
if 'digital_object' in record and not record['digital_object']['ignored'] \
and not RecordProcessor._parsing_failed_digital_object(record):
if (
'digital_object' in record
and not record['digital_object']['ignored']
and not RecordProcessor._parsing_failed_digital_object(record)
):
logging.debug(f"Indexing digital object for {key} in DB")
ignored = False
ok, err_msg = self.indexer.insert_in_db(record['digital_object']['data'])
if ok and 'thumbnail' in record and not record['thumbnail']['ignored'] \
and not RecordProcessor._parsing_failed_thumbnail(record):
ok, err_msg = self.indexer.insert_in_db(
record['digital_object']['data']
)
if (
ok
and 'thumbnail' in record
and not record['thumbnail']['ignored']
and not RecordProcessor._parsing_failed_thumbnail(record)
):
logging.debug(f"Indexing thumbnail for {key} in DB")
ignored = False
ok, err_msg = self.indexer.insert_in_db(record['thumbnail']['data'])
if ok and 'audio_snippet' in record and not record['audio_snippet']['ignored'] \
and not RecordProcessor._parsing_failed_audio_snippet(record):
if (
ok
and 'audio_snippet' in record
and not record['audio_snippet']['ignored']
and not RecordProcessor._parsing_failed_audio_snippet(record)
):
logging.debug(f"Indexing audio snippet for {key} in DB")
ignored = False
ok, err_msg = self.indexer.insert_in_db(record['audio_snippet']['data'])
if ok and not RecordProcessor._parsing_errors(record):
self.indexer.commit()
self.reporter.send_message(key, 'IGNORE' if ignored else 'SUCCESS',
('DIGITAL OBJECT: {} -- THUMBNAIL: {}' +