Unverified Commit 2bd05a68 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

extract headers from consumed records

parent 28b7ecde
Pipeline #19829 passed with stages
in 2 minutes and 3 seconds
......@@ -268,6 +268,17 @@ def _is_playable(access_status) -> bool:
return access_status == 'public' or access_status == 'closed'
def _get_institution_and_record_set(record) -> (str, str):
institution = ''
record_set = ''
for header in record.headers:
if header[0] == 'institutionId':
institution = header[1]
if header[0] == 'recordSetId':
record_set = header[1]
return institution, record_set
class MediametadataToDB:
def run(self):
"""
......@@ -283,10 +294,11 @@ class MediametadataToDB:
consumer.poll()
for record_object in consumer:
counter += 1
institution, record_set = _get_institution_and_record_set(record_object)
record = record_object.value['@graph']
record_id = _get_record_id(record)
logging.debug(f'Processing record {record_id}')
record_processor.new_record(record_id)
record_processor.new_record(record_id, institution, record_set)
access_status = _get_access_status(record, record_id)
for record_resource in record:
if _is_digital_object(record_resource):
......
......@@ -17,8 +17,11 @@ class RecordProcessor:
('thumbnail' in record and not record['thumbnail']['ok']) or \
('audio_snippet' in record and not record['audio_snippet']['ok'])
def new_record(self, rec_id):
self.processed_records[rec_id] = dict()
def new_record(self, rec_id, institution_id, record_set_id):
self.processed_records[rec_id] = {
'institution_id': institution_id,
'record_set_id': record_set_id
}
def digital_object_ok(self, rec_id, data):
logging.debug(f"Parsing of digital object resource for {rec_id} successful")
......@@ -99,9 +102,11 @@ class RecordProcessor:
if RecordProcessor._parsing_errors(record):
ok_all = False
self.reporter.send_message(key, 'FATAL',
'DIGITAL OBJECT: {} -- THUMBNAIL: {} '
'-- AUDIO SNIPPET: {}'
.format(dig_obj_msg, thumbnail_msg, audio_snip_msg))
('DIGITAL OBJECT: {} -- THUMBNAIL: {} ' +
'-- AUDIO SNIPPET: {}')
.format(dig_obj_msg, thumbnail_msg, audio_snip_msg),
record['institution_id'],
record['record_set_id'])
else:
ok = True
err_msg = ""
......@@ -117,14 +122,18 @@ class RecordProcessor:
if ok:
self.indexer.commit()
self.reporter.send_message(key, 'SUCCESS',
'DIGITAL OBJECT: {} -- THUMBNAIL: {}'
' -- AUDIO SNIPPET: {}'.format(
('DIGITAL OBJECT: {} -- THUMBNAIL: {}' +
' -- AUDIO SNIPPET: {}').format(
dig_obj_msg, thumbnail_msg, audio_snip_msg
))
),
record['institution_id'],
record['record_set_id'])
else:
ok_all = False
self.indexer.rollback()
self.reporter.send_message(key, 'FATAL',
f'Indexing failed: {err_msg}')
f'Indexing failed: {err_msg}',
record['institution_id'],
record['record_set_id'])
self.processed_records.clear()
return ok_all
......@@ -38,7 +38,7 @@ class Reporter:
time.sleep(30 * (retries + 1))
self._connect_to_kafka(retries + 1)
def send_message(self, identifier, status, message):
def send_message(self, identifier, status, message, institution_id, record_set_id):
try:
key_bytes = bytes(identifier, encoding='utf-8')
now = datetime.datetime.now()
......@@ -56,7 +56,9 @@ class Reporter:
json_report = json.dumps(report)
value_bytes = bytes(json_report, encoding='utf-8')
logging.debug(f'Sending report to {os.environ["REPORTING_TOPIC"]}: {json_report}')
self._producer.send(os.environ['REPORTING_TOPIC'], key=key_bytes, value=value_bytes)
self._producer.send(os.environ['REPORTING_TOPIC'], key=key_bytes, value=value_bytes,
headers=[("institutionId", institution_id),
("recordSetId", record_set_id)])
self._producer.flush()
except Exception as ex:
logging.error('Couldn\'t send processing report!: {}'.format(ex))
......@@ -72,7 +72,7 @@ class Test(TestCase):
}
digital_object = Test._load_file_and_get_res('BAB-PA_43-BAB_MC169A.json',
Test._get_digital_object)
dig_obj_val = MediametadataToDB._extract_digital_object_values(digital_object, 'public')
dig_obj_val = MediametadataToDB._extract_dig_obj_vals(digital_object, 'public')
self.assertDictEqual(dig_obj_val, res)
def test__get_values_from_closed_digital_object(self):
......@@ -90,7 +90,7 @@ class Test(TestCase):
}
digital_object = Test._load_file_and_get_res('mfk-FLM-167202.json',
Test._get_digital_object)
dig_obj_val = MediametadataToDB._extract_digital_object_values(digital_object, 'private')
dig_obj_val = MediametadataToDB._extract_dig_obj_vals(digital_object, 'private')
self.assertDictEqual(dig_obj_val, res)
def test__get_values_from_accessible_digital_object(self):
......@@ -108,7 +108,7 @@ class Test(TestCase):
}
digital_object = Test._load_file_and_get_res('mfk-FLM-167202.json',
Test._get_digital_object)
dig_obj_val = MediametadataToDB._extract_digital_object_values(digital_object, 'public')
dig_obj_val = MediametadataToDB._extract_dig_obj_vals(digital_object, 'public')
self.assertDictEqual(dig_obj_val, res)
def test__get_values_from_thumbnail(self):
......@@ -133,8 +133,8 @@ class Test(TestCase):
record_values_for_db.append(
MediametadataToDB._extract_fields(recordJsonData,
MediametadataToDB.
_extract_digital_object_values,
'public'))
_extract_dig_obj_vals,
'public')[0])
self.assertFalse([MediametadataToDB._has_audio_snippet(record)
for record in record_values_for_db][0])
......
......@@ -34,8 +34,8 @@ class Test(unittest.TestCase):
['sig', 'mimetype', 'height',
'width', 'duration', 'type'])
self.assertEqual(("INSERT INTO metadata (sig,mimetype,height,width,type) VALUES"
" (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE mimetype=?, height=?,"
" width=?, type=?",
" (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE mimetype=%s, height=%s,"
" width=%s, type=%s",
('test-001', 'image/jpeg', 20, 100, 'image',
'image/jpeg', 20, 100, 'image')),
(metadata_stmt, metadata_values))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment