Unverified Commit 3e3bd2e9 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

read access status and set correct access / proto tuple


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent d056cc9c
Pipeline #16655 failed with stages
in 1 minute and 23 seconds
......@@ -9,9 +9,9 @@ from kafka.errors import KafkaError
from kubernetes import config
def _try_fetch_from_json_object(record_json_data, record_values_for_db, fetch_from_obj_fun):
def _try_fetch_from_json_object(record_json_data, record_values_for_db, fetch_from_obj_fun, access_status):
if 'locator' in record_json_data:
values = fetch_from_obj_fun(record_json_data)
values = fetch_from_obj_fun(record_json_data, access_status)
record_values_for_db.append(values)
elif '@id' in record_json_data:
logging.info('Record ' + record_json_data['@id'] +
......@@ -20,10 +20,12 @@ def _try_fetch_from_json_object(record_json_data, record_values_for_db, fetch_fr
logging.warning('Record without @id-property detected!')
def _get_values_from_thumbnail_object(msg):
def _get_values_from_thumbnail_object(msg, _access_status):
return_values = {
'mimetype': 'image/jp2',
'type': 'image'
'type': 'image',
'access': 'public',
'proto': 'file'
}
if '@id' in msg:
return_values['sig'] = '{}-poster'.format(msg['@id'].split('/')[-2])
......@@ -38,10 +40,12 @@ def _get_values_from_thumbnail_object(msg):
return return_values
def _get_values_from_digital_object(msg):
sig = uri = height = width = mimetype = type = None
def _get_values_from_digital_object(msg, access_status):
sig = obj_type = None
fileExtension = ''
returnValues = {}
returnValues = {
'access': access_status
}
if '@id' in msg:
v1 = msg['@id']
v2 = v1.split('/')
......@@ -51,15 +55,15 @@ def _get_values_from_digital_object(msg):
mimetype = msg['hasMimeType']
returnValues['mimetype'] = mimetype
# create value for field 'type' form mimetype:
type = mimetype.split('/')[0]
obj_type = mimetype.split('/')[0]
if 'locator' in msg and 'https://memobase.ch/' not in msg['locator']:
uri = msg['locator']
else:
if type == 'image':
if obj_type == 'image':
fileExtension = 'jp2'
if type == 'audio':
if obj_type == 'audio':
fileExtension = 'mp4'
if type == 'video':
if obj_type == 'video':
fileExtension = 'mp4'
uri = os.environ['URI_BASE'] + sig + '.' + fileExtension
returnValues['uri'] = uri
......@@ -74,10 +78,28 @@ def _get_values_from_digital_object(msg):
returnValues['duration'] = duration
# if uri uses a play to show content, use a special 'type':
if 'isDistributedOn' in msg:
returnValues['type'] = msg['isDistributedOn']
if msg['isDistributedOn'] == 'file' and uri.startswith('http'):
if access_status == 'public':
returnValues['proto'] = 'redirect'
else:
returnValues['proto'] = 'proxy'
else:
returnValues['type'] = msg['isDistributedOn']
return returnValues
def _get_access_status(graph):
for resource in graph:
if 'type' in resource and resource['type'] == 'access' and \
'regulates' in resource and \
resource['regulates'].startswith('https://memobase.ch/digital/') and 'name' in 'resource':
if resource['name'] == 'public':
return 'public'
elif resource['name'] == 'private':
return 'closed'
return 'unavailable'
def _create_sql_stmt(table_name, record, fields):
db_fields = [dbField for dbField in fields
if dbField in record and record[dbField] is not None]
......@@ -91,7 +113,7 @@ def _create_sql_stmt(table_name, record, fields):
def _create_entities_entry(record, mariadbCursor):
fields = ['sig', 'uri']
fields = ['sig', 'uri', 'access', 'proto']
sqlStmt = _create_sql_stmt('entities', record, fields)
mariadbCursor.execute(sqlStmt)
......@@ -151,8 +173,6 @@ class MediametadataToDB():
example: there was an exception
"""
status = ''
# connect to kafka:
try:
consumer = KafkaConsumer(
......@@ -190,33 +210,35 @@ class MediametadataToDB():
recordValuesForDB = []
try: # read messages from kafka
while True:
readMessageCounter = 0
consumer.poll(max_records=25)
for recordsJson in consumer:
readMessageCounter = readMessageCounter + 1
recordsJsonData = recordsJson.value['@graph']
for recordJsonData in recordsJsonData:
if 'type' in recordJsonData and recordJsonData['type'] == 'digitalObject':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_get_values_from_digital_object)
if 'type' in recordJsonData and recordJsonData['type'] == 'thumbnail':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_get_values_from_thumbnail_object)
# if readMessageCounter >= 100:
# break
# to consider: we could skip this next block and rely on max_records instead
if len(recordValuesForDB) >= 25:
access_status = _get_access_status(recordsJsonData)
if access_status == 'public' or access_status == 'closed':
for recordJsonData in recordsJsonData:
if 'type' in recordJsonData and recordJsonData['type'] == 'digitalObject':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_get_values_from_digital_object, access_status)
if 'type' in recordJsonData and recordJsonData['type'] == 'thumbnail':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_get_values_from_thumbnail_object, access_status)
# if readMessageCounter >= 100:
# break
# to consider: we could skip this next block and rely on max_records instead
if len(recordValuesForDB) >= 25:
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
consumer.commit() # <-- uncomment this for production!
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
consumer.commit() # <-- uncomment this for production!
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
consumer.commit() # <-- uncomment this for production!
# if readMessageCounter >= 100:
# break
# if readMessageCounter >= 100:
# break
else:
logging.info('Ignoring record since access of digitalObject is {}'.format(access_status))
except KafkaError as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
......@@ -226,8 +248,6 @@ class MediametadataToDB():
return {"info": status}, 500
# FIXME: Needs probably some adaptions when final thumbnail object is known
def __init__(self):
# TODO : maybe take that to a configuration (development vs pod running in
# k8s cluster)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment