Commit a88bfe4c authored by Matthias's avatar Matthias

merge with current master branch

parents eed6ca80 1d08deb9
Pipeline #11433 passed with stages
in 5 minutes and 45 seconds
FROM python:3.8
RUN mkdir /import_api_app
WORKDIR /import_api_app
ADD import_api_app /import_api_app/
ADD requirements.txt /import_api_app/
RUN mkdir /kubectl
WORKDIR /import_api_app/kubectl
#install kubectl so that helm can work in the docker container
RUN wget https://storage.googleapis.com/kubernetes-release/release/v1.16.2/bin/linux/amd64/kubectl && chmod +x ./kubectl
......
......@@ -2,13 +2,16 @@ from flask import Flask, send_from_directory, redirect
from kubernetes import config
from flask_restful import Api
from flasgger import Swagger
from kubernetes.config import ConfigException
from import_api_app.resources.HelmStart import HelmStart
from import_api_app.resources.HelmStop import HelmStop
from import_api_app.resources.JobList import JobList
from import_api_app.resources.JobStart import JobStart
from import_api_app.resources.JobStop import JobStop
from import_api_app.resources.JobReport import JobReport
from import_api_app.resources.Job import Job
from import_api_app.resources.KafkaTopics import KafkaTopics
# from import_api_app.resources.ReadJobOptionsFromDrupal import ReadJobOptionsFromDrupal
import import_api_app.configuration
from import_api_app.resources.WriteJobResultsToDrupal import WriteJobResultToDrupal
from import_api_app.resources.ReadJobOptionsFromDrupal import ReadJobOptionsFromDrupal
......@@ -20,7 +23,6 @@ import os
def create_app(test_config=None):
app = Flask(__name__)
app.config.from_object(import_api_app.configuration)
# app.config.from_envvar('YOURAPPLICATION_SETTINGS')
......@@ -68,11 +70,11 @@ def create_app(test_config=None):
)
api.add_resource(JobStop, '/v1/job/<process_id>/<job_name>/stop')
# api.add_resource(
# HelmStart,
# '/v1/helm/<institution_id>/<record_set_id>/<process_id>/<job_name>/start'
# )
# api.add_resource(HelmStop, '/v1/helm/<process_id>/<job_name>/stop')
api.add_resource(
HelmStart,
'/v1/helm/<institution_id>/<record_set_id>/<process_id>/<job_name>/start'
)
api.add_resource(HelmStop, '/v1/helm/<process_id>/<job_name>/stop')
api.add_resource(Job, '/v1/job/<process_id>/<job_name>')
api.add_resource(WriteJobResultToDrupal, '/v1/drupal/<job_log_drupal_uuid>/<report>')
......@@ -89,12 +91,12 @@ def create_app(test_config=None):
try:
# to be used when inside a kubernetes cluster
config.load_incluster_config()
except BaseException:
except ConfigException:
try:
# use .kube directory
# for local development
config.load_kube_config()
except BaseException:
except ConfigException:
app.logger.error("No kubernetes cluster defined")
return app
from import_api_app.models.Error import ImportApiError
from flask_restful import current_app
from datetime import datetime
import requests
import json
def write_results(job_log_drupal_uuid, report):
current_app.logger.debug("uuid to write: " + job_log_drupal_uuid)
current_app.logger.debug("report to write: " + report)
headers = {
'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json',
'X-API-Key': current_app.config['drupal-api-key']
}
data = {
"data": {
"id": job_log_drupal_uuid,
"type": "paragraph--job_log_result",
"attributes": {
"field_summary": report,
"field_end_date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S+00:00")
}
}
}
url = current_app.config['DRUPAL_API_URL'] + \
'/paragraph/job_log_result/' + job_log_drupal_uuid
try:
response = requests.patch(
url,
headers=headers,
data=json.dumps(data)
)
except requests.exceptions.RequestException:
message = "It was not possible to write to Drupal API \
via the following url " + url
current_app.logger.error(message)
raise ImportApiError(message)
if response.status_code == 200:
current_app.logger.error('Updated: '+url)
return {'message': 'Updated: ' + url}
elif response.status_code == 403:
message = "Not authorized to write to: " + url
current_app.logger.error(message)
raise ImportApiError(message)
elif response.status_code == 404:
message = 'Not Found: ' + url
current_app.logger.error(message)
raise ImportApiError(message)
else:
message = "Unknown response status code for drupal api for url " + url
current_app.logger.error(message)
raise ImportApiError(message)
# make the job as not active any more and write the end data
# todo refactor this in a more generic function
def update_status(job_drupal_uuid, job_name):
# in drupal job names have underscore instead of minus
job_name = job_name.replace("-", "_")
current_app.logger.debug("uuid to write: " + job_drupal_uuid)
headers = {
'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json',
'X-API-Key': current_app.config['drupal-api-key']
}
data = {
"data": {
"id": job_drupal_uuid,
"type": "paragraph--job_"+job_name,
"attributes": {
"field_is_active": False
}
}
}
url = current_app.config['DRUPAL_API_URL'] + \
'/paragraph/job_'+job_name+'/' + job_drupal_uuid
try:
response = requests.patch(
url,
headers=headers,
data=json.dumps(data)
)
except requests.exceptions.RequestException:
message = "It was not possible to write to Drupal API \
via the following url " + url
current_app.logger.error(message)
raise ImportApiError(message)
if response.status_code == 200:
current_app.logger.error('Updated: '+url)
return {'message': 'Updated: ' + url}
elif response.status_code == 403:
message = "Not authorized to write to: " + url
current_app.logger.error(message)
raise ImportApiError(message)
elif response.status_code == 404:
message = 'Not Found: ' + url
current_app.logger.error(message)
raise ImportApiError(message)
else:
message = "Unknown response status code for drupal api for url " + url
current_app.logger.error(message)
raise ImportApiError(message)
class ImportApiError(Exception):
def __init__(self, message):
self.message = message
from flask_restful import current_app
from import_api_app.utility import generate_helm_name
import import_api_app.helm as helm
import os
from subprocess import CalledProcessError
from import_api_app.models.Error import ImportApiError
def start(process_id, job_name, job_parameters):
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.install(
chart=os.path.join(current_app.root_path, "charts", job_name),
name=helm_name,
namespace=current_app.config['NAMESPACE'],
set_values=job_parameters,
fail_on_err=False
)
message = (output.stdout + output.stderr)
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': 'Job successfully started',
'job_id': helm_name
}, 201
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
message = "It was not possible to run the helm install command"
current_app.logger.error(message)
raise ImportApiError(message)
def stop(process_id, job_name):
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.uninstall(
name=helm_name,
namespace=current_app.config['NAMESPACE'],
fail_on_err=False
)
message = output.stdout + output.stderr
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 200
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except CalledProcessError:
message = "It was not possible to run the helm uninstall command"
current_app.logger.error(message)
raise ImportApiError(message)
from kafka import KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from flask_restful import current_app
from kafka.errors import KafkaError
from import_api_app.models.Error import ImportApiError
import time
def list_topics():
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
return topics
# It also deletes the topics if it already exists if the
# delete_if_already_exists flag is set to true
def create_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
consumer = KafkaConsumer(
bootstrap_servers=current_app.config['kafka-broker-url'],
)
try:
current_topics = consumer.topics()
except KafkaError:
message = "Problem connecting to Kafka cluster"
current_app.logger.error(message)
raise ImportApiError(message)
topic_to_delete = []
for topic in topics:
if topic in current_topics:
topic_to_delete.append(topic)
try:
admin_client.delete_topics(topic_to_delete)
except KafkaError:
message = "Problem when removing already existing topics :" + ", ".join(topic_to_delete)
current_app.logger.error(message)
raise ImportApiError(message)
# wait that topics are deleted
# todo Does that make sense ? Does it take longer when the topic is full of messages ?
# todo : do a loop until the exception is notopics created is not raised any more
time.sleep(0.1)
new_topic_list = []
for topic in topics:
# todo use multiple partitions
new_topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=1))
try:
admin_client.create_topics(new_topic_list)
except KafkaError:
message = "Problem when creating new topics :" + ", ".join(topics)+" e.message()"
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics created : " + ", ".join(topics)
}
def delete_topics(topics):
admin_client = KafkaAdminClient(
bootstrap_servers=current_app.config['kafka-broker-url'],
client_id='import-api'
)
try:
admin_client.delete_topics(topics)
except KafkaError:
message = "Problem when deleting topics (probably they don't exist) :" + ", ".join(topics)
current_app.logger.error(message)
raise ImportApiError(message)
return {
'success': "Topics deleted : " + ", ".join(topics)
}
from flask_restful import current_app
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
from import_api_app.models.Error import ImportApiError
def get_report(topic_name):
try: # read from kafka topic and generate report from it:
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=current_app.config['kafka-broker-url'],
auto_offset_reset='earliest',
enable_auto_commit=False,
# we stop reading messages after 2 seconds
consumer_timeout_ms=2000
# todo maybe add group id ?
# group_id='import-api-report'+topic_name
)
except KafkaError:
message = 'It was not possible to connect to the Kafka broker'
current_app.logger.error(message)
raise ImportApiError(message)
try: # read job-reporting messages from kafka
processed = succeeded = 0
report = ''
for jobMessage in consumer:
msg_value = jobMessage.value
if msg_value['status'] == 'FAILURE':
report = report + msg_value['id'] + ": " + msg_value['message'] + "\r\n"
else:
succeeded = succeeded + 1
processed = processed + 1
report = report[:-2]
if processed == 0:
report = 'No messages found'
except KafkaError:
message = 'It was not possible to consume the Kafka messages'
current_app.logger.error(message)
raise ImportApiError(message)
# todo: fix this problem, seems to be a problem when there is only
# one message in the topic somehow
except UnboundLocalError:
if 'text-file-validation' in topic_name:
report = 'File correctly loaded (this needs to be checked more carefully)'
current_app.logger.debug("Report: " + report)
return {
'status': 'SUCCESS',
'report': report
}
else:
message = 'Problem generating the report'
current_app.logger.error(message)
raise ImportApiError(message)
if report == '':
report = str(processed)+' records successfully processed'
current_app.logger.debug("Report: "+report)
return {
'status': 'SUCCESS',
'report': report
}
else:
if 'text-file-validation' in topic_name:
report = 'Impossible to load the file(s) from the SFTP server'
current_app.logger.debug("Report: " + report)
return {
'status': 'SUCCESS',
'report': report
}
else:
report_header = str(processed) + ' records processed\r\n'
report_header += str(succeeded) + ' successful\r\n'
current_app.logger.debug("Report: " + report_header+report)
return {
'status': 'FAILURE',
'report': report_header+report
}
......@@ -7,11 +7,17 @@ GITLAB_REGISTRY="cr.gitlab.switch.ch"
TFV_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/text-file-validation
TDT_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/table-data-transform
MAPPER_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/mapper-service
MEDIA_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/media-linker
NORM_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/normalization-service
MEDIA_EXTRACTOR_REGISTRY=cr.gitlab.switch.ch/memoriav/memobase-2020/services/import-process/media-metadata-extractor
TFV_CHART_VERSION="0.3.5-chart"
TDT_CHART_VERSION="0.4.1-chart"
MAPPER_CHART_VERSION="0.8.0-chart"
TDT_CHART_VERSION="0.4.4-chart"
MAPPER_CHART_VERSION="0.9.3-chart"
MEDIA_CHART_VERSION="0.1.2-chart"
NORM_CHART_VERSION="0.1.3-chart"
MEDIA_EXTRACTOR_CHART_VERSION="0.0.2-chart"
helm chart pull ${TFV_REGISTRY}:${TFV_CHART_VERSION}
helm chart export ${TFV_REGISTRY}:${TFV_CHART_VERSION} -d charts/
......@@ -20,4 +26,13 @@ helm chart pull ${TDT_REGISTRY}:${TDT_CHART_VERSION}
helm chart export ${TDT_REGISTRY}:${TDT_CHART_VERSION} -d charts/
helm chart pull ${MAPPER_REGISTRY}:${MAPPER_CHART_VERSION}
helm chart export ${MAPPER_REGISTRY}:${MAPPER_CHART_VERSION} -d charts/
\ No newline at end of file
helm chart export ${MAPPER_REGISTRY}:${MAPPER_CHART_VERSION} -d charts/
helm chart pull ${MEDIA_REGISTRY}:${MEDIA_CHART_VERSION}
helm chart export ${MEDIA_REGISTRY}:${MEDIA_CHART_VERSION} -d charts/
helm chart pull ${NORM_REGISTRY}:${NORM_CHART_VERSION}
helm chart export ${NORM_REGISTRY}:${NORM_CHART_VERSION} -d charts/
helm chart pull ${MEDIA_EXTRACTOR_REGISTRY}:${MEDIA_EXTRACTOR_CHART_VERSION}
helm chart export ${MEDIA_EXTRACTOR_REGISTRY}:${MEDIA_EXTRACTOR_CHART_VERSION} -d charts/
\ No newline at end of file
from flask_restful import Resource, current_app, reqparse
from flask_restful import Resource, reqparse
from flasgger import swag_from
from import_api_app.utility import generate_helm_name
import import_api_app.helm as helm
import os
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Helm import start
class HelmStart(Resource):
# Todo validate requests
# @swag.validate('job-parameters')
@swag_from('HelmStart.yml')
def post(self, institution_id, record_set_id, process_id, job_name):
......@@ -20,31 +17,7 @@ class HelmStart(Resource):
job_parameters['recordSetId'] = record_set_id
job_parameters['processId'] = process_id
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.install(
chart=os.path.join(current_app.root_path, "charts", job_name),
name=helm_name,
namespace=current_app.config['NAMESPACE'],
set_values=job_parameters,
fail_on_err=False
)
message = (output.stdout + output.stderr)
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 201
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except BaseException:
current_app.logger.error("It was not possible to run the helm install command")
return {'error': 'Unexpected Helm error'}, 500
return start(process_id, job_name, job_parameters)
except ImportApiError as e:
return {'error': e.message}, 500
......@@ -37,7 +37,7 @@ parameters:
type: string
name: job_name
example: text-file-validation
enum: ['text-file-validation', 'table-data-transform', 'mapper-service']
enum: ['text-file-validation', 'table-data-transform', 'mapper-service','media-linker', 'normalization-service', 'media-metadata-extractor']
required: true
responses:
......
from flask_restful import Resource, current_app
import import_api_app.helm as helm
from flask_restful import Resource
from flasgger import swag_from
from import_api_app.utility import generate_helm_name
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Helm import stop
class HelmStop(Resource):
......@@ -9,30 +10,7 @@ class HelmStop(Resource):
# @swag.validate('job-parameters')
@swag_from('HelmStop.yml')
def delete(self, process_id, job_name):
helm_client = helm.Helm()
helm_name = generate_helm_name(process_id, job_name)
try:
output = helm_client.uninstall(
name=helm_name,
namespace=current_app.config['NAMESPACE'],
fail_on_err=False
)
message = output.stdout + output.stderr
message = message.replace('\n', '. ')
if output.returncode == 0:
return {
'status': 'SUCCESS',
'message': message,
'job_id': helm_name
}, 200
else:
return {
'status': 'FAILURE',
'message': message,
'job_id': helm_name
}, 500
except BaseException:
current_app.logger.error("It was not possible to run the helm install command")
return stop(process_id, job_name)
except ImportApiError:
return {'error': 'Unexpected Helm error'}, 500
......@@ -13,7 +13,7 @@ parameters:
type: string
name: job_name
example: text-file-validation
enum: ['text-file-validation', 'table-data-transform', 'mapper-service']
enum: ['text-file-validation', 'table-data-transform', 'mapper-service','media-linker', 'normalization-service', 'media-metadata-extractor']
required: true
responses:
......
......@@ -23,7 +23,8 @@ class Job(Resource):
type: string
name: job_name
example: text-file-validation
enum: ['text-file-validation', 'table-data-transform', 'mapper-service']
enum: ['text-file-validation', 'table-data-transform', 'mapper-service','media-linker',
'normalization-service', 'media-metadata-extractor']
required: true
responses:
200:
......
......@@ -27,6 +27,7 @@ class JobList(Resource):
# label_selector="jobType=import-job,institutionId=123"
# )
# todo add try except here
# only jobs which have import-job in the jobType job label
jobs = v1.list_namespaced_job(
namespace=current_app.config['NAMESPACE'],
......
from flask_restful import Resource, current_app
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
from flask_restful import Resource
from import_api_app.models.Error import ImportApiError
from import_api_app.models.Report import get_report
class JobReport(Resource):
class JobReport(Resource):
# Todo write/correct comment for swagger
def get(self, topic_name):
"""
......@@ -43,47 +42,7 @@ class JobReport(Resource):
example: Unexpected Kafka error
"""
try: # read from kafka topic and generate report from it:
report = ''
consumer = KafkaConsumer(
topic_name,
value_deserializer=lambda m: json.loads(m.decode('utf8')),