Unverified Commit 7ca07d59 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

better reporting

parent d7eaf493
Pipeline #17977 passed with stages
in 1 minute and 44 seconds
......@@ -199,16 +199,26 @@ def _create_sql_stmt(table_name, record, fields) -> str:
table_name, db_fields, db_values, key_value)
def _create_entities_entry(record, mariadb_cursor):
def _create_entities_entry(record, mariadb_cursor) -> bool:
fields = ['sig', 'uri', 'access', 'proto']
sql_stmt = _create_sql_stmt('entities', record, fields)
mariadb_cursor.execute(sql_stmt)
try:
mariadb_cursor.execute(sql_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(sql_stmt))
return False
def _create_metadata_entry(record, mariadb_cursor):
def _create_metadata_entry(record, mariadb_cursor) -> bool:
fields = ['sig', 'mimetype', 'height', 'width', 'duration', 'type']
sql_stmt = _create_sql_stmt('metadata', record, fields)
mariadb_cursor.execute(sql_stmt)
try:
mariadb_cursor.execute(sql_stmt)
return True
except Exception:
logging.error("Problems in sql statement: {}".format(sql_stmt))
return False
def _has_audio_snippet(record) -> bool:
......@@ -222,7 +232,8 @@ def _has_http_locator(digital_object) -> bool:
return 'locator' in digital_object and digital_object['locator'].startswith('http')
def _create_audio_snippet_entry(record, mariadb_cursor):
def _create_audio_snippet_entry(record, mariadb_cursor) -> list:
results = []
snippet_record = record.copy()
snippet_record['sig'] = snippet_record['sig'] + '-intro'
# //@formatter:off
......@@ -233,20 +244,28 @@ def _create_audio_snippet_entry(record, mariadb_cursor):
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
_create_entities_entry(snippet_record, mariadb_cursor)
_create_metadata_entry(snippet_record, mariadb_cursor)
result_entities = _create_entities_entry(snippet_record, mariadb_cursor)
results.append((record, result_entities, 'audio snippet', 'entities'))
result_metadata = _create_metadata_entry(snippet_record, mariadb_cursor)
results.append((record, result_metadata, 'audio snippet', 'metadata'))
return results
def _write_values_in_db(mariadb_cursor, record_values_for_db):
def _write_values_in_db(mariadb_cursor, record_values_for_db) -> list:
results = []
try:
for record in record_values_for_db:
_create_entities_entry(record, mariadb_cursor)
_create_metadata_entry(record, mariadb_cursor)
res_entities = _create_entities_entry(record, mariadb_cursor)
results.append((record, res_entities, 'record', 'entities'))
res_metadata = _create_metadata_entry(record, mariadb_cursor)
results.append((record, res_metadata, 'record', 'metadata'))
if _has_audio_snippet(record):
_create_audio_snippet_entry(record, mariadb_cursor)
results.extend(_create_audio_snippet_entry(record, mariadb_cursor))
return results
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
return []
class MediametadataToDB:
......@@ -320,39 +339,42 @@ class MediametadataToDB:
"Ignoring record since access of " +
"digitalObject is unavailable")
if record_number % 100 == 0:
_write_values_in_db(mariadb_cursor, record_values_for_db)
try:
mariadb_connection.commit()
for record_value in record_values_for_db:
logging.info(f'Record {record_value["sig"]} successfully indexed')
results = _write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value, result, entry_type, table in results:
if result:
logging.info(f'Indexing of {entry_type} for '
f'{record_value["sig"]} successful')
reporter.send_message(record_value['sig'], "SUCCESS",
"Indexing successful")
except mariadb.Error as error:
logging.error(error)
for record_value in record_values_for_db:
logging.info(f'Indexing of record {record_value["sig"]} failed')
f"Indexing of {entry_type} in "
f"table {table} successful")
else:
logging.warning(f'Indexing of {entry_type} for '
f'{record_value["sig"]} failed')
reporter.send_message(record_value['sig'], "FATAL",
"Indexing failed")
f"Indexing of {entry_type} in "
f"table {table} failed")
record_values_for_db = []
consumer.commit()
if counter % 1000 == 0:
logging.info('{} messages read till now'.format(counter))
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
try:
mariadb_connection.commit()
for record_value in record_values_for_db:
logging.info(f'Record {record_value["sig"]} successfully indexed')
results = _write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value, result, entry_type, table in results:
if result:
logging.info(f'Indexing of {entry_type} for '
f'{record_value["sig"]} successful')
reporter.send_message(record_value['sig'], "SUCCESS",
"Indexing successful")
except mariadb.Error as error:
logging.error(error)
for record_value in record_values_for_db:
logging.info(f'Indexing of record {record_value["sig"]} failed')
f"Indexing of {entry_type} in "
f"table {table} successful")
else:
logging.warning(f'Indexing of {entry_type} for '
f'{record_value["sig"]} failed')
reporter.send_message(record_value['sig'], "FATAL",
"Indexing failed")
for record_value in record_values_for_db:
reporter.send_message(record_value['sig'], "SUCCESS", "Indexing successful")
f"Indexing of {entry_type} in "
f"table {table} failed")
record_values_for_db = []
consumer.commit()
except Exception as ex:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment