Unverified Commit 64cc61d7 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

Merge branch 'master' of...

Merge branch 'master' of gitlab.switch.ch:memoriav/memobase-2020/services/postprocessing/mediametadatatodb
parents 8aa9be6d 95fc0ec9
Pipeline #18085 passed with stages
in 1 minute and 54 seconds
......@@ -23,7 +23,8 @@ if __name__ == "__main__":
numeric_level = getattr(logging, os.getenv('LOG_LEVEL').upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid log level: {os.getenv("LOG_LEVEL")}')
logging.basicConfig(level=numeric_level)
logging.basicConfig(format='%(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
level=numeric_level)
logging.info("Starting up")
runner = MediametadataToDB()
runner.run()
......
......@@ -18,6 +18,7 @@ import json
import logging
import numbers
import os
import re
import time
import mysql.connector as mariadb
......@@ -116,7 +117,7 @@ def _get_values_from_digital_object(msg, access_status) -> dict:
width = msg['width']
return_values['width'] = width
if 'duration' in msg:
duration = msg['duration']
duration = _normalize_duration(msg['duration'])
return_values['duration'] = duration
return_values['type'] = msg['isDistributedOn']
if 'hasMimeType' in msg:
......@@ -185,26 +186,39 @@ def _get_record_id(graph) -> str:
def _create_sql_stmt(table_name, record, fields) -> str:
db_fields = [dbField for dbField in fields
if dbField in record and record[dbField] is not None]
db_values = ','.join([str(record[db_field])
if isinstance(record[db_field], numbers.Number)
else "'{}'".format(record[db_field])
for db_field in db_fields])
db_values = [str(record[db_field])
if isinstance(record[db_field], numbers.Number)
else "'{}'".format(record[db_field])
for db_field in db_fields]
key_value = \
", ".join([k + "=" + v for (k, v) in zip(db_fields, db_values) if k != 'sig'])
db_fields = ','.join(db_fields)
db_values = ','.join(db_values)
# noinspection SqlNoDataSourceInspection
return 'INSERT IGNORE INTO {} ({}) VALUES ({})'.format(
table_name, db_fields, db_values)
return 'INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {}'.format(
table_name, db_fields, db_values, key_value)
def _create_entities_entry(record, mariadb_cursor):
def _create_entities_entry(record, mariadb_cursor) -> bool:
fields = ['sig', 'uri', 'access', 'proto']
sql_stmt = _create_sql_stmt('entities', record, fields)
mariadb_cursor.execute(sql_stmt)
try:
mariadb_cursor.execute(sql_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(sql_stmt))
return False
def _create_metadata_entry(record, mariadb_cursor):
def _create_metadata_entry(record, mariadb_cursor) -> bool:
fields = ['sig', 'mimetype', 'height', 'width', 'duration', 'type']
sql_stmt = _create_sql_stmt('metadata', record, fields)
mariadb_cursor.execute(sql_stmt)
try:
mariadb_cursor.execute(sql_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(sql_stmt))
return False
def _has_audio_snippet(record) -> bool:
......@@ -218,31 +232,78 @@ def _has_http_locator(digital_object) -> bool:
return 'locator' in digital_object and digital_object['locator'].startswith('http')
def _create_audio_snippet_entry(record, mariadb_cursor):
def _normalize_duration(duration) -> int:
if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
split = duration.split(':')
return int(split[0]) * 60 + int(split[1])
elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII):
split = duration.split(':')
return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII):
split = duration.split(':')
return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII):
return int(duration.split('.')[0])
elif re.fullmatch(r'\d+', str(duration), re.ASCII):
return int(duration)
else:
logging.warning(f'Can\'t parse duration `{duration}`')
return 0
def _create_audio_snippet_entry(record, mariadb_cursor) -> list:
results = []
snippet_record = record.copy()
if 'duration' not in snippet_record:
logging.warning("No duration for audio found: Setting duration to 0")
snippet_record['duration'] = 0
snippet_record['sig'] = snippet_record['sig'] + '-intro'
# //@formatter:off
snippet_record['duration'] = \
30.0 if float(snippet_record['duration']) >= 30.0 \
else float(snippet_record['duration'])
30 if _normalize_duration(snippet_record['duration']) >= 30 \
else _normalize_duration(snippet_record['duration'])
# //@formatter:on
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
_create_entities_entry(snippet_record, mariadb_cursor)
_create_metadata_entry(snippet_record, mariadb_cursor)
try:
result_entities = _create_entities_entry(snippet_record, mariadb_cursor)
results.append((record, result_entities, 'audio snippet', 'entities'))
except Exception as ex:
logging.error(f'Exception when writing table `entities` '
f'for audio snippet belonging to record {record["sig"]}: {str(ex)}')
results.append((record, False, 'audio snippet', 'entities'))
def _write_values_in_db(mariadb_cursor, record_values_for_db):
try:
for record in record_values_for_db:
_create_entities_entry(record, mariadb_cursor)
_create_metadata_entry(record, mariadb_cursor)
if _has_audio_snippet(record):
_create_audio_snippet_entry(record, mariadb_cursor)
result_metadata = _create_metadata_entry(snippet_record, mariadb_cursor)
results.append((record, result_metadata, 'audio snippet', 'metadata'))
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
logging.error(f'Exception when writing table `metadata` '
f'for audio snippet belonging to record {record["sig"]}: {str(ex)}')
results.append((record, False, 'audio snippet', 'metadata'))
return results
def _write_values_in_db(mariadb_cursor, record_values_for_db) -> list:
results = []
for record in record_values_for_db:
try:
res_entities = _create_entities_entry(record, mariadb_cursor)
results.append((record, res_entities, 'record', 'entities'))
except Exception as ex:
logging.error(f'Exception when writing table `entities` '
f'for record {record["sig"]}: {str(ex)}')
results.append((record, False, 'record', 'entities'))
try:
res_metadata = _create_metadata_entry(record, mariadb_cursor)
results.append((record, res_metadata, 'record', 'metadata'))
except Exception as ex:
logging.error(f'Exception when writing table `metadata` '
f'for record {record["sig"]}: {str(ex)}')
results.append((record, False, 'record', 'entities'))
if _has_audio_snippet(record):
results.extend(_create_audio_snippet_entry(record, mariadb_cursor))
return results
class MediametadataToDB:
......@@ -316,21 +377,42 @@ class MediametadataToDB:
"Ignoring record since access of " +
"digitalObject is unavailable")
if record_number % 100 == 0:
_write_values_in_db(mariadb_cursor, record_values_for_db)
results = _write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value in record_values_for_db:
logging.info(f'Record {record_value["sig"]} successfully indexed')
reporter.send_message(record_value['sig'], "SUCCESS",
"Indexing successful")
for record_value, result, entry_type, table in results:
if result:
logging.info(f'Indexing of {entry_type} for '
f'{record_value["sig"]} successful')
reporter.send_message(record_value['sig'], "SUCCESS",
f"Indexing of {entry_type} in "
f"table {table} successful")
else:
logging.warning(f'Indexing of {entry_type} for '
f'{record_value["sig"]} failed')
reporter.send_message(record_value['sig'], "FATAL",
f"Indexing of {entry_type} in "
f"table {table} failed")
record_values_for_db = []
consumer.commit()
if counter % 1000 == 0:
logging.info('{} messages read till now'.format(counter))
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
results = _write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value in record_values_for_db:
reporter.send_message(record_value['sig'], "SUCCESS", "Indexing successful")
for record_value, result, entry_type, table in results:
if result:
logging.info(f'Indexing of {entry_type} for '
f'{record_value["sig"]} successful')
reporter.send_message(record_value['sig'], "SUCCESS",
f"Indexing of {entry_type} in "
f"table {table} successful")
else:
logging.warning(f'Indexing of {entry_type} for '
f'{record_value["sig"]} failed')
reporter.send_message(record_value['sig'], "FATAL",
f"Indexing of {entry_type} in "
f"table {table} failed")
record_values_for_db = []
consumer.commit()
except Exception as ex:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment