Unverified Commit 3da63567 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

Index resource even with partial parsing failures

There are collections which have an incorrect combination of access
status and locator - namely, no locator is present, but the access
status is either public or private. However, there are also referenced
thumbnails, which were consequently ignored until now. This however
leads to the problem that the thumbnail binaries written by the Media converter to the media directory are somewhat orphaned, which is not exactly optimal. Therefore this patch allows for thumbnail or audio snippet metadata entry written to the media server index even when parsing failures in the digital object had arisen.
parent 2b9a1c69
Pipeline #20366 passed with stages
in 2 minutes
...@@ -330,15 +330,13 @@ class MediametadataToDB: ...@@ -330,15 +330,13 @@ class MediametadataToDB:
record_processor.thumbnail_ok(record_id, enriched_data) record_processor.thumbnail_ok(record_id, enriched_data)
else: else:
record_processor.thumbnail_fail(record_id, error) record_processor.thumbnail_fail(record_id, error)
ok = record_processor.index() record_processor.index()
if ok: consumer.commit()
consumer.commit()
if counter % 1000 == 0: if counter % 1000 == 0:
logging.info('{} messages read till now'.format(counter)) logging.info('{} messages read till now'.format(counter))
# arriving here means there are no new messages to poll from # arriving here means there are no new messages to poll from
ok = record_processor.index() record_processor.index()
if ok: consumer.commit()
consumer.commit()
except Exception as ex: except Exception as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex) status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status) logging.error(status)
......
...@@ -13,9 +13,21 @@ class RecordProcessor: ...@@ -13,9 +13,21 @@ class RecordProcessor:
@staticmethod @staticmethod
def _parsing_errors(record) -> bool: def _parsing_errors(record) -> bool:
return ('digital_object' in record and not record['digital_object']['ok']) or \ return RecordProcessor._parsing_failed_digital_object(record) or \
('thumbnail' in record and not record['thumbnail']['ok']) or \ RecordProcessor._parsing_failed_thumbnail(record) or \
('audio_snippet' in record and not record['audio_snippet']['ok']) RecordProcessor._parsing_failed_audio_snippet(record)
@staticmethod
def _parsing_failed_digital_object(record) -> bool:
return 'digital_object' in record and not record['digital_object']['ok']
@staticmethod
def _parsing_failed_thumbnail(record) -> bool:
return 'thumbnail' in record and not record['thumbnail']['ok']
@staticmethod
def _parsing_failed_audio_snippet(record) -> bool:
return 'audio_snippet' in record and not record['audio_snippet']['ok']
def new_record(self, rec_id, institution_id, record_set_id): def new_record(self, rec_id, institution_id, record_set_id):
self.processed_records[rec_id] = { self.processed_records[rec_id] = {
...@@ -86,10 +98,12 @@ class RecordProcessor: ...@@ -86,10 +98,12 @@ class RecordProcessor:
def abort(self, ex): def abort(self, ex):
logging.error("Indexing failed. Aborting...") logging.error("Indexing failed. Aborting...")
for key in self.processed_records.keys(): for key in self.processed_records.keys():
self.reporter.send_message(key, 'FATAL', f'Indexing failed: {ex}') record = self.processed_records[key]
self.reporter.send_message(key, 'FATAL', f'Indexing failed: {ex}',
record['institution_id'],
record['record_set_id'])
def index(self) -> bool: def index(self):
ok_all = True
for key in self.processed_records.keys(): for key in self.processed_records.keys():
record = self.processed_records[key] record = self.processed_records[key]
dig_obj_msg = \ dig_obj_msg = \
...@@ -98,9 +112,31 @@ class RecordProcessor: ...@@ -98,9 +112,31 @@ class RecordProcessor:
record['thumbnail']['msg'] if 'thumbnail' in record else 'not available' record['thumbnail']['msg'] if 'thumbnail' in record else 'not available'
audio_snip_msg = \ audio_snip_msg = \
record['audio_snippet']['msg'] if 'audio_snippet' in record else 'not available' record['audio_snippet']['msg'] if 'audio_snippet' in record else 'not available'
ok = True
if RecordProcessor._parsing_errors(record): err_msg = ""
ok_all = False if 'digital_object' in record and not record['digital_object']['ignored'] \
and not RecordProcessor._parsing_failed_digital_object(record):
logging.debug(f"Indexing digital object for {key} in DB")
ok, err_msg = self.indexer.insert_in_db(record['digital_object']['data'])
if ok and 'thumbnail' in record and not record['thumbnail']['ignored'] \
and not RecordProcessor._parsing_failed_thumbnail(record):
logging.debug(f"Indexing thumbnail for {key} in DB")
ok, err_msg = self.indexer.insert_in_db(record['thumbnail']['data'])
if ok and 'audio_snippet' in record and not record['audio_snippet']['ignored'] \
and not RecordProcessor._parsing_failed_audio_snippet(record):
logging.debug(f"Indexing audio snippet for {key} in DB")
ok, err_msg = self.indexer.insert_in_db(record['audio_snippet']['data'])
if ok and not RecordProcessor._parsing_errors(record):
self.indexer.commit()
self.reporter.send_message(key, 'SUCCESS',
('DIGITAL OBJECT: {} -- THUMBNAIL: {}' +
' -- AUDIO SNIPPET: {}').format(
dig_obj_msg, thumbnail_msg, audio_snip_msg
),
record['institution_id'],
record['record_set_id'])
elif ok:
self.indexer.commit()
self.reporter.send_message(key, 'FATAL', self.reporter.send_message(key, 'FATAL',
('DIGITAL OBJECT: {} -- THUMBNAIL: {} ' + ('DIGITAL OBJECT: {} -- THUMBNAIL: {} ' +
'-- AUDIO SNIPPET: {}') '-- AUDIO SNIPPET: {}')
...@@ -108,32 +144,9 @@ class RecordProcessor: ...@@ -108,32 +144,9 @@ class RecordProcessor:
record['institution_id'], record['institution_id'],
record['record_set_id']) record['record_set_id'])
else: else:
ok = True self.indexer.rollback()
err_msg = "" self.reporter.send_message(key, 'FATAL',
if 'digital_object' in record and not record['digital_object']['ignored']: f'Indexing failed: {err_msg}',
logging.debug(f"Indexing digital object for {key} in DB") record['institution_id'],
ok, err_msg = self.indexer.insert_in_db(record['digital_object']['data']) record['record_set_id'])
if ok and 'thumbnail' in record and not record['thumbnail']['ignored']:
logging.debug(f"Indexing thumbnail for {key} in DB")
ok, err_msg = self.indexer.insert_in_db(record['thumbnail']['data'])
if ok and 'audio_snippet' in record and not record['audio_snippet']['ignored']:
logging.debug(f"Indexing audio snippet for {key} in DB")
ok, err_msg = self.indexer.insert_in_db(record['audio_snippet']['data'])
if ok:
self.indexer.commit()
self.reporter.send_message(key, 'SUCCESS',
('DIGITAL OBJECT: {} -- THUMBNAIL: {}' +
' -- AUDIO SNIPPET: {}').format(
dig_obj_msg, thumbnail_msg, audio_snip_msg
),
record['institution_id'],
record['record_set_id'])
else:
ok_all = False
self.indexer.rollback()
self.reporter.send_message(key, 'FATAL',
f'Indexing failed: {err_msg}',
record['institution_id'],
record['record_set_id'])
self.processed_records.clear() self.processed_records.clear()
return ok_all
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment