Unverified Commit 418274c3 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

refactor reporting

Until now several reports were made for every record. This commit tries
to unify these reports into one report per processed record with the
intention to provide a better (or less messy) overview of what's
happening inside the service.
parent e79b2dcf
Pipeline #18919 passed with stages
in 2 minutes and 13 seconds
......@@ -16,12 +16,10 @@
import json
import logging
import numbers
import os
import re
import time
import mysql.connector as mariadb
# noinspection PyPackageRequirements
from kafka import KafkaConsumer
# noinspection PyPackageRequirements
......@@ -29,10 +27,13 @@ from kafka.errors import KafkaError
from kubernetes import config
from kubernetes.config.config_exception import ConfigException as K8sConfigException
from mediametadatatodb_app.resources.reporter import Reporter
from mediametadatatodb_app.resources.processor import RecordProcessor
def _connect_to_kafka(retries=0):
"""
Connect to Kafka. Abort after configured retries.
"""
try:
consumer = KafkaConsumer(
os.environ['INPUT_TOPIC'],
......@@ -59,28 +60,12 @@ def _connect_to_kafka(retries=0):
exit(1)
def _connect_to_mariadb(retries=0):
try:
mariadb_connection = mariadb.connect(user=os.environ['MARIADB_USER'],
password=os.environ['MARIADB_PASSWORD'],
host=os.environ['MARIADB_HOST'],
port=int(os.environ['MARIADB_PORT']),
database=os.environ['MARIADB_DATABASE'])
mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor()
return mariadb_connection, mariadb_cursor
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
if retries < int(os.environ['MARIADB_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_mariadb(retries + 1)
exit(1)
def _try_fetch_from_json_object(record_json_data,
fetch_from_obj_fun,
access_status) -> dict:
def _extract_fields(record_json_data,
fetch_from_obj_fun,
access_status) -> dict:
"""
Extract fields from JSON object by applying `fetch_from_obj_fun` function
"""
if 'locator' in record_json_data and '@id' in record_json_data:
return fetch_from_obj_fun(record_json_data, access_status)
elif '@id' in record_json_data:
......@@ -91,7 +76,10 @@ def _try_fetch_from_json_object(record_json_data,
return dict()
def _get_values_from_thumbnail_object(msg, _access_status) -> dict:
def _extract_thumbnail_values(msg, _access_status) -> dict:
"""
Extract information on thumbnail from JSON object
"""
return_values = {'mimetype': 'image/jp2', 'type': 'image', 'access': 'public', 'proto': 'file',
'sig': '{}-poster'.format(msg['@id'].split('/')[-2]),
'uri': 'file:///data/{}-poster.jp2'.format(msg['@id'].split('/')[-2])}
......@@ -104,7 +92,10 @@ def _get_values_from_thumbnail_object(msg, _access_status) -> dict:
return return_values
def _get_values_from_digital_object(msg, access_status) -> dict:
def _extract_digital_object_values(msg, access_status) -> dict:
"""
Extract information on digital object from JSON object
"""
if 'isDistributedOn' not in msg:
logging.warning("No isDistributedOn property found in object")
return dict()
......@@ -144,18 +135,50 @@ def _get_values_from_digital_object(msg, access_status) -> dict:
return return_values
def _create_audio_snippet_entry(record) -> dict:
"""
Create an audio snippet entry based on the digital object
"""
snippet_record = record.copy()
if 'duration' not in snippet_record:
logging.warning("No duration for audio found: Setting duration to 0")
snippet_record['duration'] = 0
snippet_record['sig'] = snippet_record['sig'] + '-intro'
snippet_record['access'] = 'public'
# //@formatter:off
snippet_record['duration'] = \
30 if _normalize_duration(snippet_record['duration']) >= 30 \
else _normalize_duration(snippet_record['duration'])
# //@formatter:on
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
return snippet_record
def _is_remote_file(msg) -> bool:
"""
Media file is saved on a remote system
"""
return 'locator' in msg and not \
msg['locator'].startswith('https://memobase.ch/')
def _is_directly_fetchable(digital_object_resource) -> bool:
"""
Media file is directly accessible (i.e. is delivered via Memobase player)
"""
# //@formatter:off
return digital_object_resource['isDistributedOn'] == 'audio' or \
digital_object_resource['isDistributedOn'] == 'image' or \
digital_object_resource['isDistributedOn'] == 'video'
digital_object_resource['isDistributedOn'] == 'image' or \
digital_object_resource['isDistributedOn'] == 'video'
# //@formatter:on
def _get_access_status(graph, record_id) -> str:
"""
Decide on access status. Possible values are `public`, `closed`, and `unavailable`
"""
for resource in graph:
if 'type' in resource and resource['type'] == 'access' and \
'regulates' in resource and \
......@@ -177,61 +200,33 @@ def _get_access_status(graph, record_id) -> str:
def _get_record_id(graph) -> str:
"""
Get record identifier
"""
for resource in graph:
if '@type' in resource and resource['@type'] == \
'https://www.ica.org/standards/RiC/ontology#Record':
return resource['@id'] if '@id' in resource else None
def _create_sql_stmt(table_name, record, fields) -> str:
db_fields = [dbField for dbField in fields
if dbField in record and record[dbField] is not None]
db_values = [str(record[db_field])
if isinstance(record[db_field], numbers.Number)
else "'{}'".format(record[db_field])
for db_field in db_fields]
key_value = \
", ".join([k + "=" + v for (k, v) in zip(db_fields, db_values) if k != 'sig'])
db_fields = ','.join(db_fields)
db_values = ','.join(db_values)
# noinspection SqlNoDataSourceInspection
return 'INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {}'.format(
table_name, db_fields, db_values, key_value)
def _create_entities_entry(record, mariadb_cursor) -> bool:
fields = ['sig', 'uri', 'access', 'proto']
sql_stmt = _create_sql_stmt('entities', record, fields)
try:
mariadb_cursor.execute(sql_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(sql_stmt))
return False
def _create_metadata_entry(record, mariadb_cursor) -> bool:
fields = ['sig', 'mimetype', 'height', 'width', 'duration', 'type']
sql_stmt = _create_sql_stmt('metadata', record, fields)
try:
mariadb_cursor.execute(sql_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(sql_stmt))
return False
def _has_audio_snippet(record) -> bool:
return record['type'] == 'audio' and \
'uri' in record and \
record['uri'].startswith('file://')
"""
Record has an attached audio snippet (created by external service)
"""
return record['type'] == 'audio' and 'uri' in record and record['uri'].startswith('file://')
def _normalize_dimension(dimension) -> int:
"""
Cast dimension to int
"""
return round(float(dimension))
def _normalize_duration(duration) -> int:
"""
Normalise different representation of duration
"""
if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
split = duration.split(':')
return int(split[0]) * 60 + int(split[1])
......@@ -250,60 +245,25 @@ def _normalize_duration(duration) -> int:
return 0
def _create_audio_snippet_entry(record, mariadb_cursor) -> list:
results = []
snippet_record = record.copy()
if 'duration' not in snippet_record:
logging.warning("No duration for audio found: Setting duration to 0")
snippet_record['duration'] = 0
snippet_record['sig'] = snippet_record['sig'] + '-intro'
snippet_record['access'] = 'public'
# //@formatter:off
snippet_record['duration'] = \
30 if _normalize_duration(snippet_record['duration']) >= 30 \
else _normalize_duration(snippet_record['duration'])
# //@formatter:on
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
try:
result_entities = _create_entities_entry(snippet_record, mariadb_cursor)
results.append((record, result_entities, 'audio snippet', 'entities'))
except Exception as ex:
logging.error(f'Exception when writing table `entities` '
f'for audio snippet belonging to record {record["sig"]}: {str(ex)}')
results.append((record, False, 'audio snippet', 'entities'))
def _is_digital_object(resource) -> bool:
"""
Resource is of type `digital object`
"""
return 'type' in resource and resource['type'] == 'digitalObject'
try:
result_metadata = _create_metadata_entry(snippet_record, mariadb_cursor)
results.append((record, result_metadata, 'audio snippet', 'metadata'))
except Exception as ex:
logging.error(f'Exception when writing table `metadata` '
f'for audio snippet belonging to record {record["sig"]}: {str(ex)}')
results.append((record, False, 'audio snippet', 'metadata'))
return results
def _is_thumbnail(resource) -> bool:
"""
Resource is of type `thumbnail`
"""
return 'type' in resource and resource['type'] == 'thumbnail'
def _write_values_in_db(mariadb_cursor, record_values_for_db) -> list:
results = []
for record in record_values_for_db:
try:
res_entities = _create_entities_entry(record, mariadb_cursor)
results.append((record, res_entities, 'record', 'entities'))
except Exception as ex:
logging.error(f'Exception when writing table `entities` '
f'for record {record["sig"]}: {str(ex)}')
results.append((record, False, 'record', 'entities'))
try:
res_metadata = _create_metadata_entry(record, mariadb_cursor)
results.append((record, res_metadata, 'record', 'metadata'))
except Exception as ex:
logging.error(f'Exception when writing table `metadata` '
f'for record {record["sig"]}: {str(ex)}')
results.append((record, False, 'record', 'entities'))
if _has_audio_snippet(record):
results.extend(_create_audio_snippet_entry(record, mariadb_cursor))
return results
def _is_playable(access_status) -> bool:
"""
Digital object can be retrieved via link
"""
return access_status == 'public' or access_status == 'closed'
class MediametadataToDB:
......@@ -311,114 +271,52 @@ class MediametadataToDB:
"""
Import media metadata to mariaDB
This service should not return anything but run forever.
---
tags:
- mediametadatatodb
responses:
500:
description: some error occured
schema:
properties:
error:
type: string
example: there was an exception
"""
consumer = _connect_to_kafka()
mariadb_connection, mariadb_cursor = _connect_to_mariadb()
# process messages:
record_values_for_db = []
reporter = Reporter()
record_processor = RecordProcessor()
counter = 0
try:
while True:
consumer.poll()
for record_number, record_object in enumerate(consumer):
for record_object in consumer:
counter += 1
record = record_object.value['@graph']
record_id = _get_record_id(record)
logging.debug(f'Processing record {record_id}')
access_status = _get_access_status(record, record_id)
enrichable = False
for record_resource in record:
if 'type' in record_resource and \
record_resource['type'] == 'digitalObject':
enrichable = True
if access_status == 'public' or access_status == 'closed':
enriched_data = \
_try_fetch_from_json_object(record_resource,
_get_values_from_digital_object,
access_status)
if not enriched_data:
reporter.send_message(record_id,
"FATAL",
"Could not process digitalObject")
else:
record_values_for_db.append(enriched_data)
# else:
# logging.info(f'Ignoring record {record_id} since' +
# f' access of digitalObject is {access_status}')
# reporter.send_message(record_id,
# "IGNORE",
# "Ignoring record since access of " +
# "digitalObject is unavailable")
if 'type' in record_resource and \
record_resource['type'] == 'thumbnail':
enrichable = True
enriched_data = \
_try_fetch_from_json_object(record_resource,
_get_values_from_thumbnail_object,
if _is_digital_object(record_resource) and _is_playable(access_status):
enriched_data = _extract_fields(record_resource,
_extract_digital_object_values,
access_status)
if enriched_data:
record_processor.digital_object_ok(record_id, enriched_data)
if _has_audio_snippet(enriched_data):
snippet_entry = _create_audio_snippet_entry(enriched_data)
if snippet_entry:
record_processor.audio_snippet_ok(record_id, snippet_entry)
else:
record_processor.audio_snippet_fail(record_id)
else:
record_processor.digital_object_fail(record_id)
elif _is_thumbnail(record_resource):
enriched_data = _extract_fields(record_resource,
_extract_thumbnail_values,
access_status)
record_values_for_db.append(enriched_data)
if not enrichable:
reporter.send_message(record_id,
"IGNORE",
"Resource has no digitalObject or thumbnail")
if record_number % 100 == 0:
results = _write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value, result, entry_type, table in results:
if result:
logging.info(f'Indexing of {entry_type} for '
f'{record_value["sig"]} successful')
reporter.send_message(record_value['sig'], "SUCCESS",
f"Indexing of {entry_type} in "
f"table {table} successful")
if enriched_data:
record_processor.thumbnail_ok(record_id, enriched_data)
else:
logging.warning(f'Indexing of {entry_type} for '
f'{record_value["sig"]} failed')
reporter.send_message(record_value['sig'], "FATAL",
f"Indexing of {entry_type} in "
f"table {table} failed")
record_values_for_db = []
consumer.commit()
record_processor.thumbnail_fail(record_id)
record_processor.index()
if counter % 1000 == 0:
logging.info('{} messages read till now'.format(counter))
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
results = _write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value, result, entry_type, table in results:
if result:
logging.info(f'Indexing of {entry_type} for '
f'{record_value["sig"]} successful')
reporter.send_message(record_value['sig'], "SUCCESS",
f"Indexing of {entry_type} in "
f"table {table} successful")
else:
logging.warning(f'Indexing of {entry_type} for '
f'{record_value["sig"]} failed')
reporter.send_message(record_value['sig'], "FATAL",
f"Indexing of {entry_type} in "
f"table {table} failed")
record_values_for_db = []
consumer.commit()
record_processor.index()
except Exception as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
for record_value in record_values_for_db:
reporter.send_message(record_value['sig'], "FATAL", f"Indexing failed: {ex}")
record_processor.abort(ex)
def __init__(self):
# TODO : maybe take that to a configuration (development vs pod running in
......
import mysql.connector as mariadb
import logging
import numbers
import os
import time
# noinspection SqlResolve,SqlNoDataSourceInspection
class Indexer:
def __init__(self):
"""
Start MariaDB client
"""
self.mariadb_connection, self.mariadb_cursor = self._connect_to_mariadb()
def _connect_to_mariadb(self, retries=0):
"""
Connect to MariaDB. Abort after configured retries.
"""
try:
mariadb_connection = mariadb.connect(user=os.environ['MARIADB_USER'],
password=os.environ['MARIADB_PASSWORD'],
host=os.environ['MARIADB_HOST'],
port=int(os.environ['MARIADB_PORT']),
database=os.environ['MARIADB_DATABASE'])
mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor()
return mariadb_connection, mariadb_cursor
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
if retries < int(os.environ['MARIADB_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
self._connect_to_mariadb(retries + 1)
exit(1)
@staticmethod
def _create_sql_stmt(table_name, record, fields) -> str:
"""
Create SQL statement
"""
db_fields = [dbField for dbField in fields
if dbField in record and record[dbField] is not None]
db_values = [str(record[db_field])
if isinstance(record[db_field], numbers.Number)
else "'{}'".format(record[db_field])
for db_field in db_fields]
key_value = \
", ".join([k + "=" + v for (k, v) in zip(db_fields, db_values) if k != 'sig'])
db_fields = ','.join(db_fields)
db_values = ','.join(db_values)
# noinspection SqlNoDataSourceInspection
return 'INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {}'.format(
table_name, db_fields, db_values, key_value)
def insert_in_db(self, record) -> bool:
"""
Insert record in DB
"""
entities_stmt = Indexer._create_sql_stmt('entities', record,
['sig', 'uri', 'access', 'proto'])
metadata_stmt = Indexer._create_sql_stmt('metadata', record,
['sig', 'mimetype', 'height',
'width', 'duration', 'type'])
try:
self.mariadb_cursor.execute(entities_stmt)
self.mariadb_cursor.execute(metadata_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(entities_stmt))
return False
def commit(self):
"""
Commit changes to DB
"""
self.mariadb_connection.commit()
def rollback(self):
"""
Rollback changes
"""
self.mariadb_cursor.reset()
self.mariadb_connection.rollback()
from mediametadatatodb_app.resources.reporter import Reporter
from mediametadatatodb_app.resources.indexer import Indexer
class RecordProcessor:
def __init__(self):
self.counter = 0
self.indexer = Indexer()
self.processed_records = dict()
self.reporter = Reporter()
@staticmethod
def _parsing_errors(record) -> bool:
return ('digital_object' in record and not record['digital_object']['ok']) or \
('thumbnail' in record and not record['thumbnail']['ok']) or \
('audio_snippet' in record and not record['audio_snippet']['ok'])
def new_record(self, rec_id):
self.processed_records[rec_id] = dict()
def digital_object_ok(self, rec_id, data):
self.counter += 1
self.processed_records[rec_id]['digital_object'] = \
{'data': data,
'ok': True,
'msg': 'successful'}
def digital_object_fail(self, rec_id):
self.processed_records[rec_id]['digital_object'] = \
{'data': None,
'ok': False,
'msg': 'parsing failed'}
def thumbnail_ok(self, rec_id, data):
self.counter += 1
self.processed_records[rec_id]['thumbnail'] = \
{'data': data,
'ok': True,
'msg': 'successful'}
def thumbnail_fail(self, rec_id):
self.processed_records[rec_id]['thumbnail'] = \
{'data': None,
'ok': False,
'msg': 'parsing failed'}
def audio_snippet_ok(self, rec_id, data):
self.counter += 1
self.processed_records[rec_id]['audio_snippet'] = \
{'data': data,
'ok': True,
'msg': 'successful'
}
def audio_snippet_fail(self, rec_id):
self.processed_records[rec_id]['audio_snippet'] = \
{'data': None,
'ok': False,
'msg': 'parsing failed'}
def abort(self, ex):
for key in self.processed_records.keys():
self.reporter.send_message(key, 'FATAL', f'Indexing failed: {ex}')
def index(self):
for key in self.processed_records.keys():
record = self.processed_records[key]
dig_obj_msg = \
record['digital_object']['msg'] if 'digital_object' in record else 'ignored'