Commit 375bfd75 authored by Matthias's avatar Matthias
Browse files

Merge branch 'access' into 'master'

Read access status and set correct access / proto tuple

See merge request !2
parents 6ba39bb2 290166e5
Pipeline #17712 passed with stages
in 1 minute and 55 seconds
GNU AFFERO GENERAL PUBLIC LICENSE
GNU AFFERO GENERAL PUBLIC LICENSE
Version 3, 19 November 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
......@@ -629,12 +629,12 @@ to attach them to the start of each source file to most effectively
state the exclusion of warranty; and each file should have at least
the "copyright" line and a pointer to where the full notice is found.
record-parser
Copyright (C) 2020 memoriav / Memobase 2020 / services
<one line to give the program's name and a brief idea of what it does.>
Copyright (C) <year> <name of author>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
......@@ -643,7 +643,7 @@ the "copyright" line and a pointer to where the full notice is found.
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <https://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail.
......@@ -658,4 +658,4 @@ specific requirements.
You should also get your employer (if you work as a programmer) or school,
if any, to sign a "copyright disclaimer" for the program, if necessary.
For more information on this, and how to apply and follow the GNU AGPL, see
<http://www.gnu.org/licenses/>.
<https://www.gnu.org/licenses/>.
\ No newline at end of file
......@@ -19,6 +19,19 @@ spec:
containers:
- name: mediametadatatodb-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/postprocessing/mediametadatatodb:latest
env:
- name: KAFKA_GROUP_ID
value: "medienserverMetadataService"
- name: KAFKA_CONNECTION_RETRIES
value: "3"
- name: MARIADB_CONNECTION_RETRIES
value: "3"
- name: INPUT_TOPIC
value: "fedora-output-json-records"
- name: REPORTING_TOPIC
value: "postprocessing-reporting"
- name: LOG_LEVEL
value: "INFO"
envFrom:
- configMapRef:
name: "prod-kafka-bootstrap-servers"
......
# mediametadatatodb
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
NAMESPACE = "memobase"
# mediametadatatodb
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
from mediametadatatodb_app.resources.MediametadataToDB import MediametadataToDB
if __name__ == "__main__":
m = MediametadataToDB()
m.run()
numeric_level = getattr(logging, os.getenv('LOG_LEVEL').upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid log level: {os.getenv("LOG_LEVEL")}')
logging.basicConfig(level=numeric_level)
logging.info("Starting up")
runner = MediametadataToDB()
runner.run()
logging.info("Shutting down")
# mediametadatatodb
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
import numbers
import os
import time
import mysql.connector as mariadb
# noinspection PyPackageRequirements
from kafka import KafkaConsumer
# noinspection PyPackageRequirements
from kafka.errors import KafkaError
from kubernetes import config
from kubernetes.config.config_exception import ConfigException as K8sConfigException
from mediametadatatodb_app.resources.reporter import Reporter
def _connect_to_kafka(retries=0):
try:
consumer = KafkaConsumer(
os.environ['INPUT_TOPIC'],
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=os.environ['KAFKA_GROUP_ID'],
consumer_timeout_ms=30000)
return consumer
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
logging.error(status)
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
def _connect_to_mariadb(retries=0):
password = os.environ["mediaserver"].split(':')[1].split('@tcp(')[0]
try:
mariadb_connection = mariadb.connect(user='medienserver',
password=password,
host='mb-db1.memobase.unibas.ch',
port=3306,
database='medienserver')
mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor()
return mariadb_connection, mariadb_cursor
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
if retries < int(os.environ['MARIADB_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
def _try_fetch_from_json_object(record_json_data, record_values_for_db, fetch_from_obj_fun):
def _try_fetch_from_json_object(record_json_data,
record_values_for_db,
fetch_from_obj_fun,
access_status):
if 'locator' in record_json_data:
values = fetch_from_obj_fun(record_json_data)
values = fetch_from_obj_fun(record_json_data, access_status)
record_values_for_db.append(values)
elif '@id' in record_json_data:
logging.info('Record ' + record_json_data['@id'] +
......@@ -20,10 +92,12 @@ def _try_fetch_from_json_object(record_json_data, record_values_for_db, fetch_fr
logging.warning('Record without @id-property detected!')
def _get_values_from_thumbnail_object(msg):
def _get_values_from_thumbnail_object(msg, _access_status):
return_values = {
'mimetype': 'image/jp2',
'type': 'image'
'type': 'image',
'access': 'public',
'proto': 'file'
}
if '@id' in msg:
return_values['sig'] = '{}-poster'.format(msg['@id'].split('/')[-2])
......@@ -38,44 +112,85 @@ def _get_values_from_thumbnail_object(msg):
return return_values
def _get_values_from_digital_object(msg):
sig = uri = height = width = mimetype = type = None
fileExtension = ''
returnValues = {}
def _get_values_from_digital_object(msg, access_status):
file_extension = ''
return_values = {
'access': access_status
}
if '@id' in msg:
v1 = msg['@id']
v2 = v1.split('/')
sig = v2[-1]
returnValues['sig'] = sig
if 'hasMimeType' in msg:
mimetype = msg['hasMimeType']
returnValues['mimetype'] = mimetype
# create value for field 'type' form mimetype:
type = mimetype.split('/')[0]
if 'locator' in msg and 'https://memobase.ch/' not in msg['locator']:
uri = msg['locator']
else:
if type == 'image':
fileExtension = 'jp2'
if type == 'audio':
fileExtension = 'mp4'
if type == 'video':
fileExtension = 'mp4'
uri = os.environ['URI_BASE'] + sig + '.' + fileExtension
returnValues['uri'] = uri
return_values['sig'] = msg['@id'].split('/')[-1]
if 'height' in msg:
height = msg['height']
returnValues['height'] = height
return_values['height'] = height
if 'width' in msg:
width = msg['width']
returnValues['width'] = width
return_values['width'] = width
if 'duration' in msg:
duration = msg['duration']
returnValues['duration'] = duration
# if uri uses a play to show content, use a special 'type':
return_values['duration'] = duration
if 'isDistributedOn' in msg:
returnValues['type'] = msg['isDistributedOn']
return returnValues
return_values['type'] = msg['isDistributedOn']
if 'hasMimeType' in msg:
if return_values['type'] == 'image':
mimetype = 'image/jp2'
else:
mimetype = msg['hasMimeType']
return_values['mimetype'] = mimetype
if _is_remote_file(msg):
return_values['uri'] = msg['locator']
if access_status == 'public':
return_values['proto'] = 'redirect'
else:
return_values['proto'] = 'proxy'
else:
return_values['proto'] = 'file'
if return_values['type'] == 'image':
file_extension = 'jp2'
if return_values['type'] == 'audio':
file_extension = 'mp4'
if return_values['type'] == 'video':
file_extension = 'mp4'
return_values['uri'] = os.environ['URI_BASE'] + return_values['sig'] + '.' + file_extension
return return_values
def _is_remote_file(msg):
return 'locator' in msg and not \
msg['locator'].startswith('https://memobase.ch/')
def _is_directly_fetchable(digital_object_resource):
return digital_object_resource['isDistributedOn'] == 'audio' or \
digital_object_resource['isDistributedOn'] == 'image' or \
digital_object_resource['isDistributedOn'] == 'video'
def _get_access_status(graph, record_id):
for resource in graph:
if 'type' in resource and resource['type'] == 'access' and \
'regulates' in resource and \
resource['regulates'].startswith('https://memobase.ch/digital/') and \
'name' in resource:
if resource['name'] == 'public':
logging.debug(f'{record_id}: Setting access for digital object to `public`')
return 'public'
elif resource['name'] == 'private':
logging.debug(f'{record_id}: Setting access for digital object to `closed`')
return 'closed'
else:
logging.info(f'Digital object of record {record_id} has access type ' +
f'`{resource["name"]}`. This makes the media resource unavailable.')
return 'unavailable'
logging.info(f'Digital object of record {record_id} has no related access information!'
+ ' Media resource is therefore unavailable')
return 'unavailable'
def _get_record_id(graph):
for resource in graph:
if '@type' in resource and resource['@type'] == \
'https://www.ica.org/standards/RiC/ontology#Record':
return resource['@id'] if '@id' in resource else None
def _create_sql_stmt(table_name, record, fields):
......@@ -86,20 +201,21 @@ def _create_sql_stmt(table_name, record, fields):
else "'{}'".format(record[db_field])
for db_field in db_fields])
db_fields = ','.join(db_fields)
# noinspection SqlNoDataSourceInspection
return 'INSERT IGNORE INTO {} ({}) VALUES ({})'.format(
table_name, db_fields, db_values)
def _create_entities_entry(record, mariadbCursor):
fields = ['sig', 'uri']
sqlStmt = _create_sql_stmt('entities', record, fields)
mariadbCursor.execute(sqlStmt)
def _create_entities_entry(record, mariadb_cursor):
fields = ['sig', 'uri', 'access', 'proto']
sql_stmt = _create_sql_stmt('entities', record, fields)
mariadb_cursor.execute(sql_stmt)
def _create_metadata_entry(record, mariadbCursor):
def _create_metadata_entry(record, mariadb_cursor):
fields = ['sig', 'mimetype', 'height', 'width', 'duration', 'type']
sqlStmt = _create_sql_stmt('metadata', record, fields)
mariadbCursor.execute(sqlStmt)
sql_stmt = _create_sql_stmt('metadata', record, fields)
mariadb_cursor.execute(sql_stmt)
def _has_audio_snippet(record):
......@@ -108,32 +224,38 @@ def _has_audio_snippet(record):
record['uri'].startswith('file://')
def _create_audio_snippet_entry(record, mariadbCursor):
def _has_http_locator(digital_object):
'locator' in digital_object and digital_object['locator'].startswith('http')
def _create_audio_snippet_entry(record, mariadb_cursor):
snippet_record = record.copy()
snippet_record['sig'] = snippet_record['sig'] + '-intro'
# //@formatter:off
snippet_record['duration'] = \
30.0 if float(snippet_record['duration']) >= 30.0 else float(snippet_record['duration'])
30.0 if float(snippet_record['duration']) >= 30.0 \
else float(snippet_record['duration'])
# //@formatter:on
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
_create_entities_entry(snippet_record, mariadbCursor)
_create_metadata_entry(snippet_record, mariadbCursor)
_create_entities_entry(snippet_record, mariadb_cursor)
_create_metadata_entry(snippet_record, mariadb_cursor)
def _write_values_in_db(mariadbCursor, recordValuesForDB):
def _write_values_in_db(mariadb_cursor, record_values_for_db):
try:
for record in recordValuesForDB:
_create_entities_entry(record, mariadbCursor)
_create_metadata_entry(record, mariadbCursor)
for record in record_values_for_db:
_create_entities_entry(record, mariadb_cursor)
_create_metadata_entry(record, mariadb_cursor)
if _has_audio_snippet(record):
_create_audio_snippet_entry(record, mariadbCursor)
_create_audio_snippet_entry(record, mariadb_cursor)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
class MediametadataToDB():
# Todo write/correct comment for swagger
class MediametadataToDB:
def run(self):
"""
Import media metadata to mariaDB
......@@ -151,82 +273,61 @@ class MediametadataToDB():
example: there was an exception
"""
status = ''
# connect to kafka:
try:
consumer = KafkaConsumer(
'fedora-output-json-records',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='medienserverMetadataService14',
consumer_timeout_ms=30000
)
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
logging.error(status)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
# connect to mariadb:
password = os.environ["mediaserver"].split(':')[1].split('@tcp(')[0]
try:
mariadbConnection = mariadb.connect(user='medienserver',
password=password,
host='mb-db1.memobase.unibas.ch',
port=3306,
database='medienserver')
mariadbConnection.autocommit = False
mariadbCursor = mariadbConnection.cursor()
mariadbCursor.execute("USE medienserver")
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
consumer = _connect_to_kafka()
mariadb_connection, mariadb_cursor = _connect_to_mariadb()
# process messages:
recordValuesForDB = []
try: # read messages from kafka
record_values_for_db = []
reporter = Reporter()
try:
while True:
readMessageCounter = 0
consumer.poll(max_records=25)
for recordsJson in consumer:
readMessageCounter = readMessageCounter + 1
recordsJsonData = recordsJson.value['@graph']
for recordJsonData in recordsJsonData:
if 'type' in recordJsonData and recordJsonData['type'] == 'digitalObject':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_get_values_from_digital_object)
if 'type' in recordJsonData and recordJsonData['type'] == 'thumbnail':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_get_values_from_thumbnail_object)
# if readMessageCounter >= 100:
# break
# to consider: we could skip this next block and rely on max_records instead
if len(recordValuesForDB) >= 25:
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
consumer.commit() # <-- uncomment this for production!
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
consumer.commit() # <-- uncomment this for production!
# if readMessageCounter >= 100:
# break
except KafkaError as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
consumer.poll()
for recordNo, recordsJson in enumerate(consumer):
records_json_data = recordsJson.value['@graph']
record_id = _get_record_id(records_json_data)
logging.debug(f'Processing record {record_id}')
access_status = _get_access_status(records_json_data, record_id)
if access_status == 'public' or access_status == 'closed':
for recordJsonData in records_json_data:
if 'type' in recordJsonData and \
recordJsonData['type'] == 'digitalObject' and \
_has_http_locator(recordJsonData):
_try_fetch_from_json_object(recordJsonData, record_values_for_db,
_get_values_from_digital_object,
access_status)
if 'type' in recordJsonData and \
recordJsonData['type'] == 'thumbnail':
_try_fetch_from_json_object(recordJsonData, record_values_for_db,
_get_values_from_thumbnail_object,
access_status)
else:
logging.info(f'Ignoring record {record_id} since' +
f' access of digitalObject is {access_status}')
reporter.send_message(record_id,
"IGNORE",
"Ignoring record since access of " +
"digitalObject is unavailable")
if recordNo % 100 == 0:
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value in record_values_for_db:
logging.info(f'Record {record_value["sig"]} successfully indexed')
reporter.send_message(record_value['sig'], "SUCCESS",
"Indexing successful")
record_values_for_db = []
consumer.commit()
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value in record_values_for_db:
reporter.send_message(record_value['sig'], "SUCCESS", "Indexing successful")
record_values_for_db = []
consumer.commit()
except Exception as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
return {"info": status}, 500
# FIXME: Needs probably some adaptions when final thumbnail object is known
for record_value in record_values_for_db:
reporter.send_message(record_value['sig'], "FATAL", f"Indexing failed: {ex}")
def __init__(self):
# TODO : maybe take that to a configuration (development vs pod running in
......@@ -234,10 +335,12 @@ class MediametadataToDB():
try:
# to be used when inside a kubernetes cluster
config.load_incluster_config()
except BaseException:
logging.info("Loading incluster config")
except K8sConfigException:
try:
# use .kube directory
# for local development
config.load_kube_config()
except BaseException:
logging.info("Loading kube config (for local development)")
except K8sConfigException:
logging.error("No kubernetes cluster defined")
# mediametadatatodb
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import logging
import os
import time
from kafka.producer import KafkaProducer