Unverified Commit e2fe92d6 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

add a reporting mechanism


Signed-off-by: Sebastian Schüpbach's avatarSebastian Schüpbach <sebastian.schuepbach@unibas.ch>
parent 40fb9724
......@@ -26,6 +26,10 @@ spec:
value: "3"
- name: MARIADB_CONNECTION_RETRIES
value: "3"
- name: INPUT_TOPIC
value: "fedora-output-json-records"
- name: REPORTING_TOPIC
value: "postprocessing-reporting"
envFrom:
- configMapRef:
name: "prod-kafka-bootstrap-servers"
......
......@@ -26,6 +26,19 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
......@@ -41,11 +54,13 @@ from kafka.errors import KafkaError
from kubernetes import config
from kubernetes.config.config_exception import ConfigException as K8sConfigException
from mediametadatatodb_app.resources.reporter import Reporter
def _connect_to_kafka(retries=0):
try:
consumer = KafkaConsumer(
'fedora-output-json-records',
os.environ['INPUT_TOPIC'],
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
auto_offset_reset='earliest',
......@@ -240,7 +255,6 @@ def _write_values_in_db(mariadb_cursor, record_values_for_db):
class MediametadataToDB:
# Todo write/correct comment for swagger
@staticmethod
def run():
"""
......@@ -264,7 +278,8 @@ class MediametadataToDB:
# process messages:
record_values_for_db = []
try: # read messages from kafka
reporter = Reporter()
try:
while True:
consumer.poll(max_records=100)
for recordsJson in consumer:
......@@ -288,6 +303,9 @@ class MediametadataToDB:
# arriving here means there are no new messages to poll from
_write_values_in_db(mariadb_cursor, record_values_for_db)
mariadb_connection.commit()
for record_value in record_values_for_db:
# TODO: Create more meaningful reports!
reporter.send_message(record_value['@id'], "SUCCESS", "Indexing successful")
record_values_for_db = []
consumer.commit()
except KafkaError as ex:
......
# mediametadatatodb
# Copyright (C) 2020 Memoriav
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import json
import logging
import os
import time
from kafka.producer import KafkaProducer
class Reporter:
def __init__(self):
self._producer = self._connect_to_kafka()
def _connect_to_kafka(self, retries=0):
try:
return KafkaProducer(
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS']
)
except Exception as ex:
logging.error('Exception while connecting Kafka: {}'.format(ex))
if retries < int(os.environ['KAFKA_CONNECTION_RETRIES']):
time.sleep(30 * (retries + 1))
self._connect_to_kafka(retries + 1)
def send_message(self, identifier, status, message):
try:
key_bytes = bytes(identifier, encoding='utf-8')
now = datetime.datetime.now()
timestamp = now.strftime('%Y-%m-%dT%H:%M:%S.') + str(round(int(now.strftime('%f')) / 1000))
report = {
'id': identifier,
'status': status,
'message': message,
'step': 'mediametadatatodb',
'timestamp': timestamp
}
value_bytes = bytes(json.dumps(report), encoding='utf-8')
self._producer.send(os.environ['REPORTING_TOPIC'], key=key_bytes, value=value_bytes)
self._producer.flush()
except Exception as ex:
logging.error('Couldn\'t send processing report!: {}'.format(ex))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment