Commit aa9f87d1 authored by Lionel Walter's avatar Lionel Walter
Browse files

Merge branch 'MEMO-137-job-resultate-aus-kafka-lesen-TMP' into MEMO-135-136-137-272-merge

parents d1820041 46d6376b
......@@ -18,6 +18,7 @@ python3 -m venv venv
. venv/bin/activate
pip3 install -r requirements.txt
pip3 install -e .
export KAFKA_BOOTSTRAP_SERVERS=mb-ka1.memobase.unibas.ch:9092,mb-ka2.memobase.unibas.ch:9092,mb-ka3.memobase.unibas.ch:9092
cd import_api_app
mkdir charts
./pull-charts.sh
......
......@@ -25,10 +25,7 @@ def create_app(test_config=None):
api = Api(app)
try:
app.config['kafka-broker-url'] = os.environ['KAFKA_BOOTSTRAP_SERVERS']
except BaseException:
app.config['kafka-broker-url'] = "mb-ka1.memobase.unibas.ch:9092"
app.config['kafka-broker-url'] = os.environ['KAFKA_BOOTSTRAP_SERVERS']
app.config['SWAGGER'] = {
'title': 'Memobase Import API',
......@@ -66,9 +63,9 @@ def create_app(test_config=None):
api.add_resource(Job, '/v1/job/<process_id>/<job_name>')
api.add_resource(WriteJobResultToDrupal, '/v1/drupal/<job_drupal_uuid>/<report>')
# api.add_resource(ReadJobOptionsFromDrupal, '/v1/fromDrupal/<job_drupal_uuid>')
api.add_resource(JobReport, '/v1/job/<job_id>/report')
api.add_resource(KafkaTopics, '/v1/kafka/topics')
api.add_resource(JobReport, '/v1/job/<topic_name>/report')
# TODO : maybe take that to a configuration (development vs pod running in
# k8s cluster)
......
......@@ -6,21 +6,19 @@ import json
class JobReport(Resource):
# Todo write/correct comment for swagger
def post(self, job_id):
# Todo validate requests
# @swag.validate('job-parameters')
def get(self, topic_name):
"""
Get report form a job
Get report from a job
---
tags:
- Job Report
parameters:
- in: path
name: job_id
name: topic_name
required: true
description: The ID of the job. Must be lowercase
and contains only letters, numbers and -.
example: j0001
description: The name of the kafka topic for which we collect
the job summary
example: 025-text-file-validation-reporting
type: string
responses:
200:
......@@ -31,17 +29,16 @@ class JobReport(Resource):
status:
type: string
example: success/error
enum: ['success', 'error']
message:
enum: ['SUCCESS', 'FAILURE']
report:
type: string
example: 'NAME: j0001. LAST DEPLOYED: Thu May 14 16:15 2020.
NAMESPACE: memobase. STATUS: deployed.
REVISION: 1. TEST SUITE: None.'
example: 'institutionXYZ-235323B: something failed!\n
institutionXYZ-235323C: that one went totally wrong'
job_id:
type: string
example: j0001
500:
description: It was impossible to stop the job
description: Impossible to get results
schema:
$ref: '#/definitions/helm-result'
......@@ -50,47 +47,44 @@ class JobReport(Resource):
try: # read from kafka topic and generate report from it:
report = ''
topic = 'me-topic' # productive topicname = job_parameters['jobType'] + "-reporting"
consumer = KafkaConsumer(
topic,
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=[current_app.config['kafka-broker-url']],
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False
)
except BaseException:
except Exception:
current_app.logger.error('It was not possible to connect to the Kafka broker')
return {'error': 'Unexpected Kafka error'}, 500
try: # read job-reporting messages from kafka
topic_partition = TopicPartition(topic=topic, partition=0)
jobMessages = consumer.poll(1000, 1000)
topic_partition = TopicPartition(topic=topic_name, partition=0)
job_messages = consumer.poll(1000, 1000)
processed = succeeded = 0
if jobMessages:
for jobMessage in jobMessages[topic_partition]:
msgValue = jobMessage.value
if msgValue['status'] == 'FAILURE':
report = report + msgValue['id'] + ": " + msgValue['message'] + "\n"
if job_messages:
for jobMessage in job_messages[topic_partition]:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\n"
else:
succeeded = succeeded + 1
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
else:
report = 'no messages found'
except BaseException:
except Exception:
current_app.logger.error('It was not possible to consume the Kafka messages')
return {'error': 'Unexpected Kafka error'}, 500
if report == '':
report = 'success'
return {
'status': 'success',
'job_id': job_id,
'status': 'SUCCESS',
'report': report
}, 201
}, 200
else:
return {
'status': 'error',
'job_id': job_id,
'status': 'FAILURE',
'report': report
}, 422
}, 200
from flask_restful import Resource, current_app, reqparse
from flasgger import swag_from
from import_api_app.utility import generate_helm_name
import import_api_app.helm as helm
import os
from import_api_app.utility import generate_helm_name
class JobStart(Resource):
......
apiVersion: v1
kind: ConfigMap
metadata:
name: import-api-configmap
namespace: memobase
data:
configmap.cfg: |
kafka-broker-url: "mb-ka1.memobase.unibas.ch:9092"
kafka-poll-amount: "1000"
kafka-poll-timeout: "1000"
\ No newline at end of file
......@@ -17,7 +17,7 @@ spec:
serviceAccountName: import-api-service-account #to be able to manage other pods inside the cluster
containers:
- name: import-api-container
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/import-api:MEMO-135-read-options-from-drupal
image: cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/import-api:latest
envFrom:
- secretRef:
name: drupal-api-credentials
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment