Unverified Commit 69017ff9 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

write all headers to report

parent 766d63c9
Pipeline #22492 passed with stages
in 1 minute and 47 seconds
......@@ -294,17 +294,6 @@ def _is_playable(access_status) -> bool:
return access_status == 'public' or access_status == 'closed'
def _get_institution_and_record_set(record) -> (str, str):
institution = ''
record_set = ''
for header in record.headers:
if header[0] == 'institutionId':
institution = header[1]
if header[0] == 'recordSetId':
record_set = header[1]
return institution, record_set
class MediametadataToDB:
def run(self):
"""
......@@ -320,11 +309,11 @@ class MediametadataToDB:
consumer.poll()
for record_object in consumer:
counter += 1
institution, record_set = _get_institution_and_record_set(record_object)
record = record_object.value['@graph']
headers = record_object.headers
record_id = _get_record_id(record)
logging.debug(f'Processing record {record_id}')
record_processor.new_record(record_id, institution, record_set)
record_processor.new_record(record_id, headers)
access_status = _get_access_status(record, record_id)
for record_resource in record:
if _is_digital_object(record_resource):
......
......@@ -29,11 +29,8 @@ class RecordProcessor:
def _parsing_failed_audio_snippet(record) -> bool:
return 'audio_snippet' in record and not record['audio_snippet']['ok']
def new_record(self, rec_id, institution_id, record_set_id):
self.processed_records[rec_id] = {
'institution_id': institution_id,
'record_set_id': record_set_id
}
def new_record(self, rec_id, headers):
self.processed_records[rec_id] = {'headers': headers}
def digital_object_ok(self, rec_id, data):
logging.debug(f"Parsing of digital object resource for {rec_id} successful")
......@@ -98,10 +95,8 @@ class RecordProcessor:
def abort(self, ex):
logging.error("Indexing failed. Aborting...")
for key in self.processed_records.keys():
record = self.processed_records[key]
self.reporter.send_message(key, 'FATAL', f'Indexing failed: {ex}',
record['institution_id'],
record['record_set_id'])
self.processed_records['headers'])
def index(self):
for key in self.processed_records.keys():
......@@ -136,21 +131,17 @@ class RecordProcessor:
('DIGITAL OBJECT: {} -- THUMBNAIL: {}' +
' -- AUDIO SNIPPET: {}').format(
dig_obj_msg, thumbnail_msg, audio_snip_msg
),
record['institution_id'],
record['record_set_id'])
), self.processed_records['headers'])
elif ok:
self.indexer.commit()
self.reporter.send_message(key, 'FATAL',
('DIGITAL OBJECT: {} -- THUMBNAIL: {} ' +
'-- AUDIO SNIPPET: {}')
.format(dig_obj_msg, thumbnail_msg, audio_snip_msg),
record['institution_id'],
record['record_set_id'])
self.processed_records['headers'])
else:
self.indexer.rollback()
self.reporter.send_message(key, 'FATAL',
f'Indexing failed: {err_msg}',
record['institution_id'],
record['record_set_id'])
self.processed_records['headers'])
self.processed_records.clear()
......@@ -38,7 +38,7 @@ class Reporter:
time.sleep(30 * (retries + 1))
self._connect_to_kafka(retries + 1)
def send_message(self, identifier, status, message, institution_id, record_set_id):
def send_message(self, identifier, status, message, headers):
try:
key_bytes = bytes(identifier, encoding='utf-8')
now = datetime.datetime.now()
......@@ -57,8 +57,7 @@ class Reporter:
value_bytes = bytes(json_report, encoding='utf-8')
logging.debug(f'Sending report to {os.environ["REPORTING_TOPIC"]}: {json_report}')
self._producer.send(os.environ['REPORTING_TOPIC'], key=key_bytes, value=value_bytes,
headers=[("institutionId", institution_id),
("recordSetId", record_set_id)])
headers=headers)
self._producer.flush()
except Exception as ex:
logging.error('Couldn\'t send processing report!: {}'.format(ex))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment