Commit 7c7ac6e4 authored by Matthias's avatar Matthias
Browse files

inital commit

parents
FROM python:3.8
WORKDIR /iiifmanifeststodb_app
ADD iiifmanifeststodb_app /iiifmanifeststodb_app/
ADD requirements.txt /iiifmanifeststodb_app/
WORKDIR /iiifmanifeststodb_app/kubectl
WORKDIR /iiifmanifeststodb_app
RUN pip install -r requirements.txt
WORKDIR /
ADD setup.py /
RUN pip install -e .
ENTRYPOINT ["python"]
CMD ["/iiifmanifeststodb_app/main.py"]
This diff is collapsed.
# IIIFManifestsToDB
# Read form iiif-manifests from kafka and write it into mediaserver-mariaDB
from iiifmanifeststodb_app.resources.IIIFManifestsToDB import IIIFManifestsToDB
if __name__ == "__main__":
m = IIIFManifestsToDB()
m.run()
import json
import logging
import os
import mysql.connector as mariadb
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from kubernetes import config
def _create_sql_stmt(table_name, fields, values):
# TODO: either -check if 'sig' already exists, if so use update-statement
# or -try using 'insert ignore' (didn't work when i manually tested it)
db_fields = ','.join(fields)
values = ["'" + val for val in values]
db_values = ','.join([values])
return 'INSERT IGNORE INTO {} ({}) VALUES ({})'.format(
table_name, db_fields, db_values)
def _create_entities_entry(key, value, mariadbCursor):
fields = ['sig', 'uri', 'manifest']
values = ['\'' + key, '', value]
sqlStmt = _create_sql_stmt('entities_me', fields, values)
mariadbCursor.execute(sqlStmt)
def _write_values_in_db(mariadbCursor, key, value):
try:
_create_entities_entry(key, value, mariadbCursor)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
class IIIFManifestsToDB():
def run(self):
"""
Read form iiif-manifests from kafka and write it into mediaserver-mariaDB
This service should not return anything but run forever.
---
tags:
- iiifmanifeststodb
responses:
500:
description: some error occured
schema:
properties:
error:
type: string
example: there was an exception
"""
status = ''
# connect to kafka:
try:
consumer = KafkaConsumer(
'iif-manifests',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='iiimanifests',
consumer_timeout_ms=30000
)
except KafkaError as ex:
status = 'KafkaError: ' + str(ex)
logging.error(status)
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
# connect to mariadb:
password = os.environ["mediaserver"].split(':')[1].split('@tcp(')[0]
try:
mariadbConnection = mariadb.connect(user='medienserver',
password=password,
host='mb-db1.memobase.unibas.ch',
port=3306,
database='medienserver')
mariadbConnection.autocommit = False
mariadbCursor = mariadbConnection.cursor()
mariadbCursor.execute("USE medienserver")
except Exception as ex:
status = 'Exception: ' + str(ex)
logging.error(status)
# process messages:
valuesForDB = {}
try: # read messages from kafka
while True:
readMessageCounter = 0
consumer.poll(max_records=25)
for manifest in consumer:
readMessageCounter = readMessageCounter + 1
valuesForDB.append()
# to consider: we could skip this next block and rely on max_records instead
if len(valuesForDB) >= 25:
_write_values_in_db(mariadbCursor, manifest.key, manifest.value)
mariadbConnection.commit()
valuesForDB = []
# consumer.commit() # <-- uncomment this for production!
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadbCursor, manifest.key, manifest.value)
mariadbConnection.commit()
valuesForDB = []
# consumer.commit() # <-- uncomment this for production!
except KafkaError as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
except Exception as ex:
status = 'It was not possible to consume the Kafka messages.' + '\n' + str(ex)
logging.error(status)
return {"info": status}, 500
def __init__(self):
# TODO : maybe take that to a configuration (development vs pod running in
# k8s cluster)
try:
# to be used when inside a kubernetes cluster
config.load_incluster_config()
except BaseException:
try:
# use .kube directory
# for local development
config.load_kube_config()
except BaseException:
logging.error("No kubernetes cluster defined")
from setuptools import setup
setup(
name='iiifmanifeststodb_app',
packages=['iiifmanifeststodb_app'],
include_package_data=True
)
[flake8]
#ignore = W503,E501,E203
max-line-length = 100
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment