Commit 22949915 authored by Jonas Waeber's avatar Jonas Waeber
Browse files

Update readme

Update start scripts.
parent 300bacda
apiVersion: v1
kind: ConfigMap
metadata:
name: ${name}
namespace: ${namespace}
data:
${file_name}: |
${content}
\ No newline at end of file
from subprocess import CompletedProcess
import subprocess
from typing import Union, Dict
class HelmChart:
"""
Arguments:
chart {str} -- Release name or path of a helm chart
Keyword Arguments:
namespace {str} -- Namespace to install the release into (default: {default})
fail_on_error {bool} -- Whether to fail with an exception if the installation fails (default: {True})
wait {bool} -- Whether to wait for all pods to be ready (default: {True})
"""
def __init__(self, name: str, chart: str, namespace: str = None,
fail_on_error: bool = False, wait: bool = True):
self.name = name
self.chart = chart
self.namespace = namespace
self.fail_on_error = fail_on_error
self.wait = wait
def install(self, process_id: str, values: Union[Dict, str, None]) -> CompletedProcess:
"""Install the helm chart on the cluster.
Arguments
process_id {str} -- The process id the name of the helm chart install is prefixed with.
values {dict,str} -- Path to a custom values.yaml file or a dictionary containing
key-value-pairs that are used to overwrite the default values of the chart.
Returns:
CompletedProcess -- CompletedProcess-object returned by subprocess
containing details about the result and output of the
executed command.
"""
helm_cmd = ["install", f'{process_id}-{self.name}', self.chart]
if values:
if isinstance(values, str):
helm_cmd.extend(("-f", values))
elif isinstance(values, dict):
opt_list = ["%s=%s" % (k, v) for k, v in values.items()]
helm_cmd.extend(("--set", ",".join(opt_list)))
if self.namespace:
helm_cmd.extend(("--namespace", self.namespace))
if self.wait:
helm_cmd.append("--wait")
return self._exec_command(helm_cmd, self.fail_on_error)
def uninstall(self, process_id: str) -> CompletedProcess:
"""Uninstall the chart from the cluster.
Arguments
process_id {str} -- The process id the name of the helm chart install was prefixed with.
Returns:
CompletedProcess -- CompletedProcess-object returned by subprocess
containing details about the result and output of the
executed command.
"""
helm_cmd = ["uninstall", f'{process_id}-{self.name}']
if self.namespace:
helm_cmd.extend(("--namespace", self.namespace))
return self._exec_command(helm_cmd, self.fail_on_error)
@staticmethod
def _exec_command(cmd: list, fail_on_error: bool = False) -> CompletedProcess:
base_cmd = [
"helm",
]
# for debug print (' '.join(base_cmd+cmd))
return subprocess.run(
base_cmd + cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=fail_on_error
)
text_file_validation = HelmChart('text-file-validation', './charts/text-file-validation')
table_data_transform = HelmChart('table-data-transform', './charts/table-data-transform')
xml_data_transform = HelmChart('xml-data-transform', './charts/xml-data-transform')
mapper_service = HelmChart('mapper-service', './charts/mapper-service')
media_linker = HelmChart('media-linker', './charts/media-linker')
media_metadata_extractor = HelmChart('media-metadata-extractor', './charts/media-metadata-extractor')
normalization_service = HelmChart('normalization-service', './charts/normalization-service')
fedora_ingest_service = HelmChart('fedora-ingest-service', './charts/fedora-ingest-service')
all_charts = [
text_file_validation,
table_data_transform,
xml_data_transform,
mapper_service,
media_linker,
media_metadata_extractor,
normalization_service,
fedora_ingest_service
]
\ No newline at end of file
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError
import logging
from typing import List
class KafkaTopics:
def __init__(self, process_id: str, jobs: List[str], **kwargs):
self.admin_client = KafkaAdminClient(**kwargs)
self.process_id = process_id
reporting = 'reporting'
self.topics = list()
for job in jobs:
self.topics.append(f'{process_id}-{job}')
self.topics.append(f'{process_id}-{job}-{reporting}')
self.topics.append(f"{process_id}-{reporting}")
self.topic_list = list()
for name in self.topics:
self.topic_list.append(NewTopic(name=name, num_partitions=3, replication_factor=1))
def create(self):
try:
self.admin_client.create_topics(new_topics=self.topic_list, validate_only=False)
logging.info(f"Created the following topics: {self.topics}.")
except TopicAlreadyExistsError:
logging.error("Topics already exists!")
def delete(self):
try:
self.admin_client.delete_topics(topics=self.topics)
logging.info(f"Deleted the following topics: {self.topics}.")
except UnknownTopicOrPartitionError:
logging.error(f"Topics not found. Were they already deleted?")
import subprocess
from subprocess import CompletedProcess
class ConfigMap:
def __init__(self, **kwargs):
self.name = kwargs['name']
self.path = kwargs['path']
def create(self, process_id: str) -> CompletedProcess:
cmd = ["create", "configmap", f'{process_id}-{self.name}', "--from-file", self.path]
return self._exec_command(cmd, False)
def delete(self, process_id: str) -> CompletedProcess:
cmd = ["delete", "configmap", f'{process_id}-{self.name}']
return self._exec_command(cmd, False)
@staticmethod
def _exec_command(cmd, fail_on_err=True) -> CompletedProcess:
base_cmd = [
"kubectl",
]
# for debug print (' '.join(base_cmd+cmd))
return subprocess.run(
base_cmd + cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=fail_on_err
)
from api.kafka import KafkaTopics
from api.helm import all_charts
from api.kubernetes import ConfigMap
import json
import logging
class ImportProcess:
def __init__(self, **kwargs):
self.process_id = kwargs['global']['processId']
self.config_maps = [ConfigMap(**cm) for cm in kwargs['config-maps']]
with open('./configurations/kafka.json', mode='r') as fp:
self.topics = KafkaTopics(self.process_id, kwargs['steps'], **json.load(fp))
self.charts = filter(lambda x: x.name in kwargs['steps'], all_charts)
self.values = kwargs
def start(self):
self.topics.create()
for config_map in self.config_maps:
completed_process = config_map.create(self.process_id)
if completed_process.returncode:
logging.error(completed_process.stderr.decode('utf-8'))
exit(1)
else:
logging.info(f"Deployed config map {self.process_id}-{config_map.name}.")
values = dict()
for chart in self.charts:
values.clear()
for key in self.values['global']:
values[key] = self.values['global'][key]
if chart.name in self.values:
for key in self.values[chart.name]:
values[key] = self.values[chart.name][key]
completed_process = chart.install(self.process_id, values)
if completed_process.returncode:
logging.error(completed_process.stderr.decode('utf-8'))
exit(1)
else:
logging.info(f"Installed chart {chart.name}.")
def remove(self):
for chart in self.charts:
completed_process = chart.uninstall(self.process_id)
if completed_process.returncode:
logging.error(completed_process.stderr.decode('utf-8'))
else:
logging.info(f"Uninstalled chart {chart.name}.")
for config_map in self.config_maps:
completed_process = config_map.delete(self.process_id)
if completed_process.returncode:
logging.error(completed_process.stderr.decode('utf-8'))
else:
logging.info(f"Deleted config map {self.process_id}-{config_map.name}.")
self.topics.delete()
......@@ -14,8 +14,6 @@ class WorkflowRunner:
def run(self):
drupal_uuid = str(uuid.uuid4())
session_id = str(uuid.uuid4())
url_mapping = '{}/FetchMappingFile/{}/{}'.format(self.api_endpoint, self.record_set_id, session_id)
res_mapping = requests.get(url_mapping)
data = {
'job-parameters': {
'drupalJobUuid': drupal_uuid,
......@@ -30,4 +28,7 @@ class WorkflowRunner:
}
url_workflow = '{}/importprocess/{}/{}/start'.format(self.api_endpoint, self.institution_id, self.record_set_id)
res_workflow = requests.post(url=url_workflow, json=data, headers={'Accept': 'application/json'})
return ((url_mapping, res_mapping.status_code), (url_workflow, res_workflow.status_code, res_workflow.json()))
if res_workflow.ok:
return url_workflow, res_workflow.status_code, res_workflow.json()
else:
return url_workflow, res_workflow.status_code, res_workflow.text
\ No newline at end of file
import json
import logging
import sys
import os
import time
import sys
from api.run import WorkflowRunner
API_ENDPOINT = 'https://import.memobase.k8s.unibas.ch/v1'
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
......@@ -14,14 +12,8 @@ logging.basicConfig(stream=sys.stdout, level=logging.INFO)
if __name__ == '__main__':
for directory in os.listdir('configurations/'):
if os.path.isdir("configurations/" + directory):
try:
with open(f'configurations/{directory}/setup.json', 'r') as fp:
workflow = WorkflowRunner(API_ENDPOINT, **json.load(fp))
res = workflow.run()
print('Endpoint: {}\nStatus: {}'.format(
res[0][0], res[0][1]))
print('Endpoint: {}\nStatus: {}\nResponse: {}'.format(
res[1][0], res[1][1], res[1][2]))
time.sleep(15)
except json.decoder.JSONDecodeError:
print(f'ERROR in {directory}!')
with open(f'configurations/{directory}/setup.json', 'r') as fp:
workflow = WorkflowRunner(API_ENDPOINT, **json.load(fp))
res = workflow.run()
print('Endpoint: {}\nStatus: {}\nResponse: {}'.format(
res[0], res[1], res[2]))
......@@ -9,9 +9,7 @@ if __name__ == '__main__':
API_ENDPOINT = 'https://import.memobase.k8s.unibas.ch/v1'
with open(f'configurations/{sys.argv[1]}/setup.json', 'r') as fp:
workflow = WorkflowRunner(API_ENDPOINT,**json.load(fp))
workflow = WorkflowRunner(API_ENDPOINT, **json.load(fp))
res = workflow.run()
print('Endpoint: {}\nStatus: {}'.format(
res[0][0], res[0][1]))
print('Endpoint: {}\nStatus: {}\nResponse: {}'.format(
res[1][0], res[1][1], res[1][2]))
res[0], res[1], res[2]))
......@@ -4,6 +4,19 @@ a set of files which are added to a specific configmap.
Each bash script can be used to replace the existing configmap with a new one.
**IMPORTANT**: After updating a config map which is directly loaded as an environment variable all the deployments
which use this service must be restarted.
This command will restart all deployments in the memobase namespace.
```bash
kubectl -n memobase rollout restart deploy
```
To re-deploy a single deployment use:
```bash
kubectl rollout restart deploy {deployment-name}
```
### Update Configurations
These configurations here are the master copies and should always be changed with care and
version controlled.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment