标签:client net ini yam nod self exce test core
k8s python api二次封装
pip install pprint   kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml
class K8sApi(object):
    def __init__(self):
        # self.config = config.kube_config.load_kube_config()
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.configuration = client.Configuration()
        self.configuration.host = "https://192.168.100.111:6443"
        self.configuration.api_key[‘authorization‘] = ‘Bearer token‘
        self.configuration.verify_ssl = False
        self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))
        self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))
        self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))
    ####################################################################################################################
    def list_deployment(self, namespace="default"):
        api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
        return api_response
    def read_deployment(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)
        return api_response
    def create_deployment(self, file="deploy-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_deployment(
                body=dep, namespace="default")
            return resp
    def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,
                                                                  body=dep)
            return resp
    def delete_deployment(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)
        return api_response
    ####################################################################################################################
    def list_node(self):
        api_response = self.Api_Instance.list_node()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"name": i.metadata.name,
                                     "status": i.status.conditions[-1].type if i.status.conditions[
                                                                                   -1].status == "True" else "NotReady",
                                     "ip": i.status.addresses[0].address,
                                     "kubelet_version": i.status.node_info.kubelet_version,
                                     "os_image": i.status.node_info.os_image,
                                     }
        return data
    def list_pod(self):
        api_response = self.Api_Instance.list_pod_for_all_namespaces()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}
        return data
    def read_pod(self, name="nginx-pod", namespace="default"):
        api_response = self.Api_Instance.read_namespaced_pod(name, namespace)
        return api_response
    def create_pod(self, file="pod-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)
            return api_response
    def replace_pod(self, name="nginx-pod", namespace="default"):
        dep = self.read_pod(name)
        api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)
        return api_response
    def delete_pod(self, name="nginx-pod", namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)
        return api_response
    ####################################################################################################################
    def list_service(self):
        api_response = self.Api_Instance.list_service_for_all_namespaces()
        return api_response
    def read_service(self, name="", namespace="default"):
        api_response = self.Api_Instance.read_namespaced_service(name, namespace)
        return api_response
    def create_service(self, file="service-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)
            return api_response
    def replace_service(self, name="hequan", namespace="default"):
        dep = self.read_pod(name)
        api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)
        return api_response
    def delete_service(self, name="hequan", namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_service(name, namespace)
        return api_response
    ####################################################################################################################
    def list_ingress(self):
        api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
        return api_response
    def read_ingress(self, name="", namespace="default"):
        api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)
        return api_response
    def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)
            return api_response
    def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,
                                                                                   body=dep)
            return api_response
    def delete_ingress(self, name="hequan", namespace="default"):
        api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)
        return api_response
    #####################################################################################################################
    def list_stateful(self):
        api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
        return api_response
    def read_stateful(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)
        return api_response
    def create_stateful(self, file="deploy-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_stateful_set(
                body=dep, namespace="default")
            return resp
    def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,
                                                                    body=dep)
            return resp
    def delete_stateful(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)
        return api_response
    ####################################################################################################################
if __name__ == ‘__main__‘:
    def test():
        obj = K8sApi()
        ret = obj.list_deployment()
        pprint(ret)
    test()
标签:client net ini yam nod self exce test core
原文地址:https://blog.51cto.com/hequan/2438485