Unverified Commit f1839a23 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

retry if connection to kafka / mariadb can't be established


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 6214a51a
......@@ -18,7 +18,14 @@ spec:
spec:
containers:
- name: mediametadatatodb-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/postprocessing/mediametadatatodb:latest
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/postprocessing/mediametadatatodb:access
env:
- name: KAFKA_GROUP_ID
value: "medienserverMetadataService"
- name: KAFKA_CONNECTION_RETRIES
value: "3"
- name: MARIADB_CONNECTION_RETRIES
value: "3"
envFrom:
- configMapRef:
name: "prod-kafka-bootstrap-servers"
......
from mediametadatatodb_app.resources.MediametadataToDB import MediametadataToDB
if __name__ == "__main__":
m = MediametadataToDB()
m.run()
MediametadataToDB.run()
......@@ -2,11 +2,63 @@ import json
import logging
import numbers
import os
import time
import mysql.connector as mariadb
# noinspection PyPackageRequirements
from kafka import KafkaConsumer
# noinspection PyPackageRequirements
from kafka.errors import KafkaError
from kubernetes import config
from kubernetes.config.config_exception import ConfigException as K8sConfigException
def _connect_to_kafka(retries=0):
try:
consumer = KafkaConsumer(
'fedora-output-json-records',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=os.environ['KAFKA_GROUP_ID'],
consumer_timeout_ms=30000)
return consumer
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
logging.error(status)
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
def _connect_to_mariadb(retries=0):
password = os.environ["mediaserver"].split(':')[1].split('@tcp(')[0]
try:
mariadb_connection = mariadb.connect(user='medienserver',
password=password,
host='mb-db1.memobase.unibas.ch',
port=3306,
database='medienserver')
mariadb_connection.autocommit = False
mariadb_cursor = mariadb_connection.cursor()
mariadb_cursor.execute("USE medienserver")
return mariadb_connection, mariadb_cursor
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
if retries < int(os.environ['MARIADB_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
_connect_to_kafka(retries + 1)
exit(1)
def _try_fetch_from_json_object(record_json_data,
......@@ -45,50 +97,50 @@ def _get_values_from_thumbnail_object(msg, _access_status):
def _get_values_from_digital_object(msg, access_status):
sig = obj_type = None
fileExtension = ''
returnValues = {
file_extension = ''
return_values = {
'access': access_status
}
if '@id' in msg:
v1 = msg['@id']
v2 = v1.split('/')
sig = v2[-1]
returnValues['sig'] = sig
return_values['sig'] = sig
if 'hasMimeType' in msg:
mimetype = msg['hasMimeType']
returnValues['mimetype'] = mimetype
return_values['mimetype'] = mimetype
# create value for field 'type' form mimetype:
obj_type = mimetype.split('/')[0]
if 'locator' in msg and 'https://memobase.ch/' not in msg['locator']:
uri = msg['locator']
else:
if obj_type == 'image':
fileExtension = 'jp2'
file_extension = 'jp2'
if obj_type == 'audio':
fileExtension = 'mp4'
file_extension = 'mp4'
if obj_type == 'video':
fileExtension = 'mp4'
uri = os.environ['URI_BASE'] + sig + '.' + fileExtension
returnValues['uri'] = uri
file_extension = 'mp4'
uri = os.environ['URI_BASE'] + sig + '.' + file_extension
return_values['uri'] = uri
if 'height' in msg:
height = msg['height']
returnValues['height'] = height
return_values['height'] = height
if 'width' in msg:
width = msg['width']
returnValues['width'] = width
return_values['width'] = width
if 'duration' in msg:
duration = msg['duration']
returnValues['duration'] = duration
return_values['duration'] = duration
# if uri uses a play to show content, use a special 'type':
if 'isDistributedOn' in msg:
if msg['isDistributedOn'] == 'file' and uri.startswith('http'):
if access_status == 'public':
returnValues['proto'] = 'redirect'
return_values['proto'] = 'redirect'
else:
returnValues['proto'] = 'proxy'
return_values['proto'] = 'proxy'
else:
returnValues['type'] = msg['isDistributedOn']
return returnValues
return_values['type'] = msg['isDistributedOn']
return return_values
def _get_access_status(graph):
......@@ -112,20 +164,21 @@ def _create_sql_stmt(table_name, record, fields):
else "'{}'".format(record[db_field])
for db_field in db_fields])
db_fields = ','.join(db_fields)
# noinspection SqlNoDataSourceInspection
return 'INSERT IGNORE INTO {} ({}) VALUES ({})'.format(
table_name, db_fields, db_values)
def _create_entities_entry(record, mariadbCursor):
def _create_entities_entry(record, mariadb_cursor):
fields = ['sig', 'uri', 'access', 'proto']
sqlStmt = _create_sql_stmt('entities', record, fields)
mariadbCursor.execute(sqlStmt)
sql_stmt = _create_sql_stmt('entities', record, fields)
mariadb_cursor.execute(sql_stmt)
def _create_metadata_entry(record, mariadbCursor):
def _create_metadata_entry(record, mariadb_cursor):
fields = ['sig', 'mimetype', 'height', 'width', 'duration', 'type']
sqlStmt = _create_sql_stmt('metadata', record, fields)
mariadbCursor.execute(sqlStmt)
sql_stmt = _create_sql_stmt('metadata', record, fields)
mariadb_cursor.execute(sql_stmt)
def _has_audio_snippet(record):
......@@ -134,34 +187,35 @@ def _has_audio_snippet(record):
record['uri'].startswith('file://')
def _create_audio_snippet_entry(record, mariadbCursor):
def _create_audio_snippet_entry(record, mariadb_cursor):
snippet_record = record.copy()
snippet_record['sig'] = snippet_record['sig'] + '-intro'
snippet_record['duration'] = \
30.0 if float(snippet_record['duration']) >= 30.0 \
else float(snippet_record['duration'])
else float(snippet_record['duration'])
snippet_record['mimetype'] = 'audio/mpeg'
snippet_record['uri'] = \
'.'.join(snippet_record['uri'].split('.')[0:-1]) + '-intro.mp3'
_create_entities_entry(snippet_record, mariadbCursor)
_create_metadata_entry(snippet_record, mariadbCursor)
_create_entities_entry(snippet_record, mariadb_cursor)
_create_metadata_entry(snippet_record, mariadb_cursor)
def _write_values_in_db(mariadbCursor, recordValuesForDB):
def _write_values_in_db(mariadb_cursor, record_values_for_db):
try:
for record in recordValuesForDB:
_create_entities_entry(record, mariadbCursor)
_create_metadata_entry(record, mariadbCursor)
for record in record_values_for_db:
_create_entities_entry(record, mariadb_cursor)
_create_metadata_entry(record, mariadb_cursor)
if _has_audio_snippet(record):
_create_audio_snippet_entry(record, mariadbCursor)
_create_audio_snippet_entry(record, mariadb_cursor)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
class MediametadataToDB():
class MediametadataToDB:
# Todo write/correct comment for swagger
def run(self):
@staticmethod
def run():
"""
Import media metadata to mariaDB
This service should not return anything but run forever.
......@@ -178,71 +232,41 @@ class MediametadataToDB():
example: there was an exception
"""
# connect to kafka:
try:
consumer = KafkaConsumer(
'fedora-output-json-records',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='medienserverMetadataService14',
consumer_timeout_ms=30000
)
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
logging.error(status)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
# connect to mariadb:
password = os.environ["mediaserver"].split(':')[1].split('@tcp(')[0]
try:
mariadbConnection = mariadb.connect(user='medienserver',
password=password,
host='mb-db1.memobase.unibas.ch',
port=3306,
database='medienserver')
mariadbConnection.autocommit = False
mariadbCursor = mariadbConnection.cursor()
mariadbCursor.execute("USE medienserver")
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
consumer = _connect_to_kafka()
mariadb_connection, mariadb_cursor = _connect_to_mariadb()
# process messages:
recordValuesForDB = []
record_values_for_db = []
try: # read messages from kafka
while True:
consumer.poll(max_records=25)
for recordsJson in consumer:
recordsJsonData = recordsJson.value['@graph']
access_status = _get_access_status(recordsJsonData)
records_json_data = recordsJson.value['@graph']
access_status = _get_access_status(records_json_data)
if access_status == 'public' or access_status == 'closed':
for recordJsonData in recordsJsonData:
for recordJsonData in records_json_data:
if 'type' in recordJsonData and \
recordJsonData['type'] == 'digitalObject':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_try_fetch_from_json_object(recordJsonData, record_values_for_db,
_get_values_from_digital_object,
access_status)
if 'type' in recordJsonData and \
recordJsonData['type'] == 'thumbnail':
_try_fetch_from_json_object(recordJsonData, recordValuesForDB,
_try_fetch_from_json_object(recordJsonData, record_values_for_db,
_get_values_from_thumbnail_object,
access_status)
# if readMessageCounter >= 100:
# break
# to consider: we could skip this next block and rely on max_records instead
if len(recordValuesForDB) >= 25:
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
if len(record_values_for_db) >= 25:
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
record_values_for_db = []
consumer.commit() # <-- uncomment this for production!
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadbCursor, recordValuesForDB)
mariadbConnection.commit()
recordValuesForDB = []
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
record_values_for_db = []
consumer.commit() # <-- uncomment this for production!
# if readMessageCounter >= 100:
# break
......@@ -264,10 +288,10 @@ class MediametadataToDB():
try:
# to be used when inside a kubernetes cluster
config.load_incluster_config()
except BaseException:
except K8sConfigException:
try:
# use .kube directory
# for local development
config.load_kube_config()
except BaseException:
except K8sConfigException:
logging.error("No kubernetes cluster defined")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment