Unverified Commit 40fb9724 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

fix indexing issue

parent bf97d57c
Pipeline #16726 passed with stages
in 4 minutes and 32 seconds
...@@ -79,7 +79,6 @@ def _connect_to_mariadb(retries=0): ...@@ -79,7 +79,6 @@ def _connect_to_mariadb(retries=0):
database='medienserver') database='medienserver')
mariadb_connection.autocommit = False mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor() mariadb_cursor = mariadb_connection.cursor()
mariadb_cursor.execute("USE medienserver")
return mariadb_connection, mariadb_cursor return mariadb_connection, mariadb_cursor
except Exception as ex: except Exception as ex:
status = 'Exception: ' + str(ex) status = 'Exception: ' + str(ex)
...@@ -125,21 +124,19 @@ def _get_values_from_thumbnail_object(msg, _access_status): ...@@ -125,21 +124,19 @@ def _get_values_from_thumbnail_object(msg, _access_status):
def _get_values_from_digital_object(msg, access_status): def _get_values_from_digital_object(msg, access_status):
sig = obj_type = None obj_type = None
file_extension = '' file_extension = ''
return_values = { return_values = {
'access': access_status 'access': access_status
} }
if '@id' in msg: if '@id' in msg:
v1 = msg['@id'] return_values['sig'] = msg['@id'].split('/')[-1]
v2 = v1.split('/')
sig = v2[-1]
return_values['sig'] = sig
if 'hasMimeType' in msg: if 'hasMimeType' in msg:
mimetype = msg['hasMimeType'] mimetype = msg['hasMimeType']
return_values['mimetype'] = mimetype return_values['mimetype'] = mimetype
# create value for field 'type' form mimetype: # create value for field 'type' form mimetype:
obj_type = mimetype.split('/')[0] obj_type = mimetype.split('/')[0]
return_values['type'] = obj_type
if 'locator' in msg and 'https://memobase.ch/' not in msg['locator']: if 'locator' in msg and 'https://memobase.ch/' not in msg['locator']:
uri = msg['locator'] uri = msg['locator']
else: else:
...@@ -149,7 +146,7 @@ def _get_values_from_digital_object(msg, access_status): ...@@ -149,7 +146,7 @@ def _get_values_from_digital_object(msg, access_status):
file_extension = 'mp4' file_extension = 'mp4'
if obj_type == 'video': if obj_type == 'video':
file_extension = 'mp4' file_extension = 'mp4'
uri = os.environ['URI_BASE'] + sig + '.' + file_extension uri = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
return_values['uri'] = uri return_values['uri'] = uri
if 'height' in msg: if 'height' in msg:
height = msg['height'] height = msg['height']
...@@ -160,7 +157,6 @@ def _get_values_from_digital_object(msg, access_status): ...@@ -160,7 +157,6 @@ def _get_values_from_digital_object(msg, access_status):
if 'duration' in msg: if 'duration' in msg:
duration = msg['duration'] duration = msg['duration']
return_values['duration'] = duration return_values['duration'] = duration
# if uri uses a play to show content, use a special 'type':
if 'isDistributedOn' in msg: if 'isDistributedOn' in msg:
if msg['isDistributedOn'] == 'file' and uri.startswith('http'): if msg['isDistributedOn'] == 'file' and uri.startswith('http'):
if access_status == 'public': if access_status == 'public':
...@@ -270,7 +266,7 @@ class MediametadataToDB: ...@@ -270,7 +266,7 @@ class MediametadataToDB:
record_values_for_db = [] record_values_for_db = []
try: # read messages from kafka try: # read messages from kafka
while True: while True:
consumer.poll(max_records=25) consumer.poll(max_records=100)
for recordsJson in consumer: for recordsJson in consumer:
records_json_data = recordsJson.value['@graph'] records_json_data = recordsJson.value['@graph']
access_status = _get_access_status(records_json_data) access_status = _get_access_status(records_json_data)
...@@ -286,24 +282,14 @@ class MediametadataToDB: ...@@ -286,24 +282,14 @@ class MediametadataToDB:
_try_fetch_from_json_object(recordJsonData, record_values_for_db, _try_fetch_from_json_object(recordJsonData, record_values_for_db,
_get_values_from_thumbnail_object, _get_values_from_thumbnail_object,
access_status) access_status)
# if readMessageCounter >= 100:
# break
# to consider: we could skip this next block and rely on max_records instead
if len(record_values_for_db) >= 25:
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
record_values_for_db = []
consumer.commit() # <-- uncomment this for production!
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
record_values_for_db = []
consumer.commit() # <-- uncomment this for production!
# if readMessageCounter >= 100:
# break
else: else:
logging.info('Ignoring record since access of digitalObject is {}' logging.info('Ignoring record since access of digitalObject is {}'
.format(access_status)) .format(access_status))
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
record_values_for_db = []
consumer.commit()
except KafkaError as ex: except KafkaError as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex) status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status) logging.error(status)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment