Verified Commit 538f718c authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

fix error message

parent 54b88057
Pipeline #35609 passed with stages
in 1 minute and 50 seconds
......@@ -20,11 +20,11 @@ import os
from mediametadatatodb_app.resources.MediametadataToDB import MediametadataToDB
if __name__ == "__main__":
numeric_level = getattr(logging, os.getenv('LOG_LEVEL').upper(), None)
numeric_level = getattr(logging, os.getenv("LOG_LEVEL").upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid log level: {os.getenv("LOG_LEVEL")}')
logging.basicConfig(
format='%(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
format="%(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
level=numeric_level,
)
logging.info("Starting up")
......
......@@ -38,26 +38,26 @@ def _connect_to_kafka(retries=0):
"""
try:
consumer = KafkaConsumer(
os.environ['TOPIC_IN'],
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
os.environ["TOPIC_IN"],
value_deserializer=lambda m: json.loads(m.decode("utf8")),
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
auto_offset_reset="earliest",
enable_auto_commit=False,
group_id=os.environ['GROUP_ID'],
group_id=os.environ["GROUP_ID"],
consumer_timeout_ms=30000,
)
return consumer
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
status = "KafkaError: " + str(ex)
logging.error(status)
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
if retries < int(os.environ["KAFKA_CONNECTION_RETRIES"]):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
except Exception as ex:
status = 'Exception: ' + str(ex)
status = "Exception: " + str(ex)
logging.error(status)
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
if retries < int(os.environ["KAFKA_CONNECTION_RETRIES"]):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
......@@ -69,16 +69,16 @@ def _extract_fields(
"""
Extract fields from JSON object by applying `fetch_from_obj_fun` function
"""
if 'locator' in record_json_data and '@id' in record_json_data:
if "locator" in record_json_data and "@id" in record_json_data:
return fetch_from_obj_fun(record_json_data, access_status), None
elif '@id' in record_json_data:
elif "@id" in record_json_data:
logging.info(
'Record ' + record_json_data['@id'] + ' does not have a locator property.'
"Record " + record_json_data["@id"] + " does not have a locator property."
)
return dict(), 'No locator property found'
return dict(), "No locator property found"
else:
logging.warning('Record without @id-property detected!')
return dict(), 'No @id property found'
logging.warning("Record without @id-property detected!")
return dict(), "No @id property found"
def _extract_thumbnail_values(msg, _access_status) -> dict:
......@@ -87,33 +87,33 @@ def _extract_thumbnail_values(msg, _access_status) -> dict:
"""
del _access_status
return_values = {
'type': 'image',
'access': 'public',
'proto': 'file',
'sig': '{}-poster'.format(msg['@id'].split('/')[-2]),
"type": "image",
"access": "public",
"proto": "file",
"sig": "{}-poster".format(msg["@id"].split("/")[-2]),
}
if 'height' in msg:
height = _normalize_dimension(msg['height'])
return_values['height'] = height
if 'width' in msg:
width = _normalize_dimension(msg['width'])
return_values['width'] = width
if 'hasMimeType' in msg:
return_values['mimetype'] = msg['hasMimeType']
if return_values['mimetype'] == 'image/jpeg':
file_extension = 'jpg'
elif return_values['mimetype'] == 'image/png':
file_extension = 'png'
elif return_values['mimetype'] == 'image/jp2':
file_extension = 'jp2'
if "height" in msg:
height = _normalize_dimension(msg["height"])
return_values["height"] = height
if "width" in msg:
width = _normalize_dimension(msg["width"])
return_values["width"] = width
if "hasMimeType" in msg:
return_values["mimetype"] = msg["hasMimeType"]
if return_values["mimetype"] == "image/jpeg":
file_extension = "jpg"
elif return_values["mimetype"] == "image/png":
file_extension = "png"
elif return_values["mimetype"] == "image/jp2":
file_extension = "jp2"
else:
file_extension = ''
logging.warning('No valid mimetype found!')
file_extension = ""
logging.warning("No valid mimetype found!")
else:
file_extension = ''
logging.warning('No valid mimetype found!')
return_values['uri'] = 'file:///data/{}-poster.{}'.format(
msg['@id'].split('/')[-2], file_extension
file_extension = ""
logging.warning("No valid mimetype found!")
return_values["uri"] = "file:///data/{}-poster.{}".format(
msg["@id"].split("/")[-2], file_extension
)
return return_values
......@@ -122,57 +122,57 @@ def _extract_dig_obj_vals(msg, access_status) -> dict:
"""
Extract information on digital object from JSON object
"""
if 'isDistributedOn' not in msg:
if "isDistributedOn" not in msg:
logging.warning("No isDistributedOn property found in object")
return dict()
file_extension = ''
return_values = {'access': access_status, 'sig': msg['@id'].split('/')[-1]}
if 'height' in msg:
height = _normalize_dimension(msg['height'])
return_values['height'] = height
if 'width' in msg:
width = _normalize_dimension(msg['width'])
return_values['width'] = width
if 'duration' in msg:
duration = _normalize_duration(msg['duration'])
return_values['duration'] = duration
return_values['type'] = msg['isDistributedOn']
if 'hasMimeType' in msg:
mimetype = msg['hasMimeType']
return_values['mimetype'] = mimetype
file_extension = ""
return_values = {"access": access_status, "sig": msg["@id"].split("/")[-1]}
if "height" in msg:
height = _normalize_dimension(msg["height"])
return_values["height"] = height
if "width" in msg:
width = _normalize_dimension(msg["width"])
return_values["width"] = width
if "duration" in msg:
duration = _normalize_duration(msg["duration"])
return_values["duration"] = duration
return_values["type"] = msg["isDistributedOn"]
if "hasMimeType" in msg:
mimetype = msg["hasMimeType"]
return_values["mimetype"] = mimetype
if _is_remote_file(msg):
return_values['uri'] = msg['locator']
if access_status == 'public' and not return_values['type'] == 'image':
return_values["uri"] = msg["locator"]
if access_status == "public" and not return_values["type"] == "image":
# Remote images are always accessed via proxy because their respective
# route goes over the internal image server
return_values['proto'] = 'redirect'
return_values["proto"] = "redirect"
else:
if msg.get('proxyType'):
return_values['proto'] = msg['proxyType']
if msg.get("proxyType"):
return_values["proto"] = msg["proxyType"]
else:
return_values['proto'] = 'proxydirect'
return_values["proto"] = "proxydirect"
else:
return_values['proto'] = 'file'
if return_values['type'] == 'image':
if return_values.get('mimetype'):
if return_values['mimetype'] == 'image/jpeg':
file_extension = 'jpg'
elif return_values['mimetype'] == 'image/png':
file_extension = 'png'
elif return_values['mimetype'] == 'image/jp2':
file_extension = 'jp2'
return_values["proto"] = "file"
if return_values["type"] == "image":
if return_values.get("mimetype"):
if return_values["mimetype"] == "image/jpeg":
file_extension = "jpg"
elif return_values["mimetype"] == "image/png":
file_extension = "png"
elif return_values["mimetype"] == "image/jp2":
file_extension = "jp2"
else:
file_extension = ''
logging.warning('No valid mimetype found!')
file_extension = ""
logging.warning("No valid mimetype found!")
else:
file_extension = ''
logging.warning('No valid mimetype found!')
if return_values['type'] == 'audio':
file_extension = 'mp4'
if return_values['type'] == 'video':
file_extension = 'mp4'
return_values['uri'] = (
os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
file_extension = ""
logging.warning("No valid mimetype found!")
if return_values["type"] == "audio":
file_extension = "mp4"
if return_values["type"] == "video":
file_extension = "mp4"
return_values["uri"] = (
os.environ["URI_BASE"] + return_values["sig"] + "." + file_extension
)
return return_values
......@@ -182,21 +182,21 @@ def _create_audio_snippet_entry(record, access_status) -> dict:
Create an audio snippet entry based on the digital object
"""
snippet_record = record.copy()
if 'duration' not in snippet_record:
if "duration" not in snippet_record:
logging.warning("No duration for audio found: Setting duration to 0")
snippet_record['duration'] = 0
snippet_record['sig'] = snippet_record['sig'] + '-intro'
snippet_record['access'] = access_status
snippet_record["duration"] = 0
snippet_record["sig"] = snippet_record["sig"] + "-intro"
snippet_record["access"] = access_status
# //@formatter:off
snippet_record['duration'] = (
snippet_record["duration"] = (
30
if _normalize_duration(snippet_record['duration']) >= 30
else _normalize_duration(snippet_record['duration'])
if _normalize_duration(snippet_record["duration"]) >= 30
else _normalize_duration(snippet_record["duration"])
)
# //@formatter:on
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = (
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
snippet_record["mimetype"] = "audio/mpeg"
snippet_record["uri"] = (
".".join(snippet_record["uri"].split(".")[0:-1]) + "-intro.mp3"
)
return snippet_record
......@@ -205,7 +205,7 @@ def _is_remote_file(msg) -> bool:
"""
Media file is saved on a remote system
"""
return 'locator' in msg and not msg['locator'].startswith('sftp:/')
return "locator" in msg and not msg["locator"].startswith("sftp:/")
def _get_access_status(graph, record_id) -> str:
......@@ -218,55 +218,55 @@ def _get_access_status(graph, record_id) -> str:
access_flags = list()
for resource in graph:
if (
'type' in resource
and resource['type'] == 'access'
and 'regulates' in resource
and resource['regulates'].startswith('https://memobase.ch/digital/')
and 'name' in resource
"type" in resource
and resource["type"] == "access"
and "regulates" in resource
and resource["regulates"].startswith("https://memobase.ch/digital/")
and "name" in resource
):
if resource['name'] == 'public':
access_flags.append('public')
elif resource['name'] == 'private':
if resource["name"] == "public":
access_flags.append("public")
elif resource["name"] == "private":
logging.debug(
f'{record_id}: Setting access for digital object to `closed`'
f"{record_id}: Setting access for digital object to `closed`"
)
access_flags.append('closed')
access_flags.append("closed")
else:
logging.info(
f'Digital object of record {record_id} has access type '
f"Digital object of record {record_id} has access type "
+ f'`{resource["name"]}`. This makes the media resource unavailable.'
)
access_flags.append(resource['name'])
if 'closed' in access_flags:
logging.debug(f'{record_id}: Setting access for digital object to `closed`')
return 'closed'
elif 'public' in access_flags:
logging.debug(f'{record_id}: Setting access for digital object to `public`')
return 'public'
elif 'faro' in access_flags:
access_flags.append(resource["name"])
if "closed" in access_flags:
logging.debug(f"{record_id}: Setting access for digital object to `closed`")
return "closed"
elif "public" in access_flags:
logging.debug(f"{record_id}: Setting access for digital object to `public`")
return "public"
elif "faro" in access_flags:
logging.info(
f'Digital object of record {record_id} has access type `faro`.'
+ ' This makes the media resource unavailable.'
f"Digital object of record {record_id} has access type `faro`."
+ " This makes the media resource unavailable."
)
return 'faro'
elif 'onsite' in access_flags:
return "faro"
elif "onsite" in access_flags:
logging.info(
f'Digital object of record {record_id} has access type `onsite`.'
+ ' This makes the media resource unavailable.'
f"Digital object of record {record_id} has access type `onsite`."
+ " This makes the media resource unavailable."
)
return 'onsite'
elif 'noonsite' in access_flags:
return "onsite"
elif "noonsite" in access_flags:
logging.info(
f'Digital object of record {record_id} has access type `noonsite`.'
+ ' This makes the media resource unavailable.'
f"Digital object of record {record_id} has access type `noonsite`."
+ " This makes the media resource unavailable."
)
return 'noonsite'
return "noonsite"
else:
logging.warning(
f'Digital object of record {record_id} has no or invalid access information!'
+ ' The media resource is therefore unavailable'
f"Digital object of record {record_id} has no or invalid access information!"
+ " The media resource is therefore unavailable"
)
return 'unavailable'
return "unavailable"
def _get_record_id(graph) -> Optional[str]:
......@@ -275,10 +275,10 @@ def _get_record_id(graph) -> Optional[str]:
"""
for resource in graph:
if (
'@type' in resource
and resource['@type'] == 'https://www.ica.org/standards/RiC/ontology#Record'
"@type" in resource
and resource["@type"] == "https://www.ica.org/standards/RiC/ontology#Record"
):
return resource['@id'] if '@id' in resource else None
return resource["@id"] if "@id" in resource else None
def _has_audio_snippet(record) -> bool:
......@@ -286,9 +286,9 @@ def _has_audio_snippet(record) -> bool:
Record has an attached audio snippet (created by external service)
"""
return (
record['type'] == 'audio'
and 'uri' in record
and record['uri'].startswith('file://')
record["type"] == "audio"
and "uri" in record
and record["uri"].startswith("file://")
)
......@@ -303,21 +303,21 @@ def _normalize_duration(duration) -> int:
"""
Normalise different representation of duration
"""
if re.fullmatch(r'\d+:\d{2}', str(duration), re.ASCII):
split = duration.split(':')
if re.fullmatch(r"\d+:\d{2}", str(duration), re.ASCII):
split = duration.split(":")
return int(split[0]) * 60 + int(split[1])
elif re.fullmatch(r'\d+:\d{2}:\d{2}', str(duration), re.ASCII):
split = duration.split(':')
elif re.fullmatch(r"\d+:\d{2}:\d{2}", str(duration), re.ASCII):
split = duration.split(":")
return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
elif re.fullmatch(r'\d+:\d{2}:\d{2}\d{3}', str(duration), re.ASCII):
split = duration.split(':')
elif re.fullmatch(r"\d+:\d{2}:\d{2}\d{3}", str(duration), re.ASCII):
split = duration.split(":")
return int(split[0]) * 3600 + int(split[1]) * 60 + int(split[2])
elif re.fullmatch(r'\d+.\d{6}', str(duration), re.ASCII):
return int(duration.split('.')[0])
elif re.fullmatch(r'\d+', str(duration), re.ASCII):
elif re.fullmatch(r"\d+.\d{6}", str(duration), re.ASCII):
return int(duration.split(".")[0])
elif re.fullmatch(r"\d+", str(duration), re.ASCII):
return int(duration)
else:
logging.warning(f'Can\'t parse duration `{duration}`')
logging.warning(f"Can't parse duration `{duration}`")
return 0
......@@ -325,21 +325,21 @@ def _is_digital_object(resource) -> bool:
"""
Resource is of type `digital object`
"""
return 'type' in resource and resource['type'] == 'digitalObject'
return "type" in resource and resource["type"] == "digitalObject"
def _is_thumbnail(resource) -> bool:
"""
Resource is of type `thumbnail`
"""
return 'type' in resource and resource['type'] == 'thumbnail'
return "type" in resource and resource["type"] == "thumbnail"
def _is_playable(access_status) -> bool:
"""
Digital object can be retrieved via link
"""
return access_status == 'public' or access_status == 'closed'
return access_status == "public" or access_status == "closed"
class MediametadataToDB:
......@@ -357,10 +357,10 @@ class MediametadataToDB:
consumer.poll()
for record_object in consumer:
counter += 1
record = record_object.value['@graph']
record = record_object.value["@graph"]
headers = record_object.headers
record_id = _get_record_id(record)
logging.debug(f'Processing record {record_id}')
logging.debug(f"Processing record {record_id}")
record_processor.new_record(record_id, headers)
access_status = _get_access_status(record, record_id)
for record_resource in record:
......@@ -391,9 +391,9 @@ class MediametadataToDB:
record_processor.digital_object_fail(
record_id, error
)
elif access_status == 'unavailable':
elif access_status == "unavailable":
record_processor.digital_object_fail(
record_id, "invalid" " or missing" "access flag"
record_id, "invalid or missing access flag"
)
else:
record_processor.digital_object_ignore(
......@@ -415,13 +415,13 @@ class MediametadataToDB:
record_processor.index()
consumer.commit()
if counter % 1000 == 0:
logging.info('{} messages read till now'.format(counter))
logging.info("{} messages read till now".format(counter))
# arriving here means there are no new messages to poll from
record_processor.index()
consumer.commit()
except Exception as ex:
status = (
'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
"It was not possible to consume the Kafka messages." + "\n" + str(ex)
)
logging.error(status)
record_processor.abort(ex)
......
......@@ -23,19 +23,19 @@ class Indexer:
f"{os.environ['MARIADB_HOST']}:{os.environ['MARIADB_PORT']}"
)
mariadb_connection = mariadb.connect(
user=os.environ['MARIADB_USER'],
password=os.environ['MARIADB_PASSWORD'].rstrip(),
host=os.environ['MARIADB_HOST'],
port=int(os.environ['MARIADB_PORT']),
database=os.environ['MARIADB_DATABASE'],
user=os.environ["MARIADB_USER"],
password=os.environ["MARIADB_PASSWORD"].rstrip(),
host=os.environ["MARIADB_HOST"],
port=int(os.environ["MARIADB_PORT"]),
database=os.environ["MARIADB_DATABASE"],
)
mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor()
return mariadb_connection, mariadb_cursor
except Exception as ex:
status = 'Exception: ' + str(ex)
status = "Exception: " + str(ex)
logging.error(status)
if retries < int(os.environ['MARIADB_CONNECTION_RETRIES']):
if retries < int(os.environ["MARIADB_CONNECTION_RETRIES"]):
time.sleep(30 * (retries + 1))
self._connect_to_mariadb(retries + 1)
exit(1)
......@@ -46,12 +46,12 @@ class Indexer:
Create SQL statement
"""
db_values = [record[f] for f in fields if f in record and record[f]]
db_fields = ','.join([f for f in fields if f in record and record[f]])
db_value_placeholders = ', '.join(
['%s' for f in fields if f in record and record[f]]
db_fields = ",".join([f for f in fields if f in record and record[f]])
db_value_placeholders = ", ".join(
["%s" for f in fields if f in record and record[f]]
)
# noinspection SqlNoDataSourceInspection
sql = 'REPLACE INTO {} ({}) VALUES ({})'.format(
sql = "REPLACE INTO {} ({}) VALUES ({})".format(
table_name, db_fields, db_value_placeholders
)
return sql, tuple(db_values)
......@@ -61,21 +61,21 @@ class Indexer:
Insert record in DB
"""
stmt, values = Indexer._create_sql_stmt(
'entities', record, ['sig', 'uri', 'access', 'proto']
"entities", record, ["sig", "uri", "access", "proto"]
)
try:
self.mariadb_cursor.execute(stmt, values)
stmt, values = Indexer._create_sql_stmt(
'metadata',
"metadata",
record,
['sig', 'mimetype', 'height', 'width', 'duration', 'type'],
["sig", "mimetype", "height", "width", "duration", "type"],
)
self.mariadb_cursor.execute(stmt, values)
return True, ""
except mariadb.Error as ex:
logging.error(
f'Problems in sql statement (statement: "{stmt}", '
f'parameters: {values}): {ex}'
f"parameters: {values}): {ex}"
)
return False, str(ex)
......
......@@ -21,83 +21,83 @@ class RecordProcessor:
@staticmethod
def _parsing_failed_digital_object(record) -> bool:
return 'digital_object' in record and not record['digital_object']['ok']