kubernetes.py 915 Bytes
Newer Older
1
2
import subprocess
from subprocess import CompletedProcess
3
4
5
6
7
8
9
10


class ConfigMap:

    def __init__(self, **kwargs):
        self.name = kwargs['name']
        self.path = kwargs['path']

11
12
13
    def create(self, process_id: str) -> CompletedProcess:
        cmd = ["create", "configmap", f'{process_id}-{self.name}', "--from-file", self.path]
        return self._exec_command(cmd, False)
14

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    def delete(self, process_id: str) -> CompletedProcess:
        cmd = ["delete", "configmap", f'{process_id}-{self.name}']
        return self._exec_command(cmd, False)

    @staticmethod
    def _exec_command(cmd, fail_on_err=True) -> CompletedProcess:
        base_cmd = [
            "kubectl",
        ]
        # for debug print (' '.join(base_cmd+cmd))
        return subprocess.run(
            base_cmd + cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            check=fail_on_err
        )