feat: use server-side apply
diff --git a/atmosphere/flows.py b/atmosphere/flows.py
index 623dff0..c938ccf 100644
--- a/atmosphere/flows.py
+++ b/atmosphere/flows.py
@@ -1,7 +1,8 @@
from taskflow.patterns import graph_flow
from atmosphere.config import CONF
-from atmosphere.tasks import flux, kubernetes, openstack_helm
+from atmosphere.tasks.composite import openstack_helm
+from atmosphere.tasks.kubernetes import flux, v1
NAMESPACE_CERT_MANAGER = "cert-manager"
NAMESPACE_KUBE_SYSTEM = "kube-system"
@@ -27,36 +28,40 @@
"master": {"nodeSelector": CONTROL_PLANE_NODE_SELECTOR}
}
+PERCONA_XTRADB_OPERATOR_VALUES = {
+ "nodeSelector": CONTROL_PLANE_NODE_SELECTOR,
+}
+
def get_deployment_flow():
flow = graph_flow.Flow("deploy").add(
# kube-system
- kubernetes.CreateOrUpdateNamespaceTask(name=NAMESPACE_KUBE_SYSTEM),
- flux.CreateOrUpdateHelmRepositoryTask(
+ v1.ApplyNamespaceTask(name=NAMESPACE_KUBE_SYSTEM),
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_KUBE_SYSTEM,
name=HELM_REPOSITORY_CEPH,
url="https://ceph.github.io/csi-charts",
),
# cert-manager
- kubernetes.CreateOrUpdateNamespaceTask(name=NAMESPACE_CERT_MANAGER),
- flux.CreateOrUpdateHelmRepositoryTask(
+ v1.ApplyNamespaceTask(name=NAMESPACE_CERT_MANAGER),
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_CERT_MANAGER,
name=HELM_REPOSITORY_JETSTACK,
url="https://charts.jetstack.io",
),
# monitoring
- kubernetes.CreateOrUpdateNamespaceTask(name=NAMESPACE_MONITORING),
- flux.CreateOrUpdateHelmRepositoryTask(
+ v1.ApplyNamespaceTask(name=NAMESPACE_MONITORING),
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_MONITORING,
name=HELM_REPOSITORY_PROMETHEUS_COMMUINTY,
url="https://prometheus-community.github.io/helm-charts",
),
- flux.CreateOrUpdateHelmRepositoryTask(
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_MONITORING,
name=HELM_REPOSITORY_NODE_FEATURE_DISCOVERY,
url="https://kubernetes-sigs.github.io/node-feature-discovery/charts",
),
- flux.CreateOrUpdateHelmReleaseTask(
+ flux.ApplyHelmReleaseTask(
namespace=NAMESPACE_MONITORING,
name="node-feature-discovery",
repository=HELM_REPOSITORY_NODE_FEATURE_DISCOVERY,
@@ -65,33 +70,41 @@
values=NODE_FEATURE_DISCOVERY_VALUES,
),
# openstack
- kubernetes.CreateOrUpdateNamespaceTask(name=NAMESPACE_OPENSTACK),
- flux.CreateOrUpdateHelmRepositoryTask(
+ v1.ApplyNamespaceTask(name=NAMESPACE_OPENSTACK),
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_OPENSTACK,
name=HELM_REPOSITORY_BITNAMI,
url="https://charts.bitnami.com/bitnami",
),
- flux.CreateOrUpdateHelmRepositoryTask(
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_OPENSTACK,
name=HELM_REPOSITORY_PERCONA,
url="https://percona.github.io/percona-helm-charts/",
),
- flux.CreateOrUpdateHelmRepositoryTask(
+ flux.ApplyHelmReleaseTask(
+ namespace=NAMESPACE_OPENSTACK,
+ name="pxc-operator",
+ repository=HELM_REPOSITORY_PERCONA,
+ chart="pxc-operator",
+ version="1.10.0",
+ values=PERCONA_XTRADB_OPERATOR_VALUES,
+ ),
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_OPENSTACK,
name=HELM_REPOSITORY_INGRESS_NGINX,
url="https://kubernetes.github.io/ingress-nginx",
),
- flux.CreateOrUpdateHelmRepositoryTask(
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_OPENSTACK,
name=HELM_REPOSITORY_OPENSTACK_HELM_INFRA,
url="https://tarballs.opendev.org/openstack/openstack-helm-infra/",
),
- flux.CreateOrUpdateHelmRepositoryTask(
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_OPENSTACK,
name=HELM_REPOSITORY_COREDNS,
url="https://coredns.github.io/helm",
),
- flux.CreateOrUpdateHelmRepositoryTask(
+ flux.ApplyHelmRepositoryTask(
namespace=NAMESPACE_OPENSTACK,
name=HELM_REPOSITORY_OPENSTACK_HELM,
url="https://tarballs.opendev.org/openstack/openstack-helm/",
@@ -100,16 +113,16 @@
if CONF.memcached.enabled:
flow.add(
- openstack_helm.CreateOrUpdateReleaseSecretTask(
+ openstack_helm.ApplyReleaseSecretTask(
namespace=NAMESPACE_OPENSTACK, chart="memcached"
),
- openstack_helm.CreateOrUpdateHelmReleaseTask(
+ openstack_helm.ApplyHelmReleaseTask(
namespace=NAMESPACE_OPENSTACK,
repository=HELM_REPOSITORY_OPENSTACK_HELM_INFRA,
name="memcached",
version="0.1.12",
),
- kubernetes.CreateOrUpdateServiceTask(
+ v1.ApplyServiceTask(
namespace=NAMESPACE_OPENSTACK,
name="memcached-metrics",
labels={
diff --git a/atmosphere/tasks/composite/__init__.py b/atmosphere/tasks/composite/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/atmosphere/tasks/composite/__init__.py
diff --git a/atmosphere/tasks/openstack_helm.py b/atmosphere/tasks/composite/openstack_helm.py
similarity index 81%
rename from atmosphere/tasks/openstack_helm.py
rename to atmosphere/tasks/composite/openstack_helm.py
index afb92a6..b8a0332 100644
--- a/atmosphere/tasks/openstack_helm.py
+++ b/atmosphere/tasks/composite/openstack_helm.py
@@ -1,8 +1,8 @@
from atmosphere.models.openstack_helm import values
-from atmosphere.tasks import flux, kubernetes
+from atmosphere.tasks.kubernetes import flux, v1
-class CreateOrUpdateReleaseSecretTask(kubernetes.CreateOrUpdateSecretTask):
+class ApplyReleaseSecretTask(v1.ApplySecretTask):
def __init__(self, namespace: str, chart: str, *args, **kwargs):
super().__init__(
namespace,
@@ -13,7 +13,7 @@
)
-class CreateOrUpdateHelmReleaseTask(flux.CreateOrUpdateHelmReleaseTask):
+class ApplyHelmReleaseTask(flux.ApplyHelmReleaseTask):
def __init__(
self,
namespace: str,
diff --git a/atmosphere/tasks/kubernetes.py b/atmosphere/tasks/kubernetes.py
deleted file mode 100644
index 348c347..0000000
--- a/atmosphere/tasks/kubernetes.py
+++ /dev/null
@@ -1,184 +0,0 @@
-import re
-
-import pykube
-from taskflow import task
-
-from atmosphere import clients, logger
-
-CAMEL_CASE_PATTERN = re.compile(r"(?<!^)(?=[A-Z])")
-LOG = logger.get_logger()
-
-
-class CreateOrUpdateKubernetesObjectTask(task.Task):
- def __init__(
- self, kind: pykube.objects.APIObject, namespace: str, name: str, *args, **kwargs
- ):
- self._obj_kind = kind
- self._obj_namespace = namespace
- self._obj_name = name
-
- kwargs["name"] = CAMEL_CASE_PATTERN.sub("-", kind.__name__).lower()
- if namespace:
- kwargs["name"] += f"-{namespace}"
- kwargs["name"] += f"-{name}"
-
- if namespace:
- # kwargs.setdefault("requires", [])
- # kwargs["requires"] += [f"namespace-{namespace}"]
- kwargs.setdefault("rebind", {})
- kwargs["rebind"]["namespace"] = f"namespace-{namespace}"
-
- kwargs.setdefault("provides", set())
- kwargs["provides"] = kwargs["provides"].union(set([kwargs["name"]]))
-
- super().__init__(*args, **kwargs)
-
- @property
- def api(self):
- return clients.get_pykube_api()
-
- @property
- def logger(self):
- log = LOG.bind(
- kind=self._obj_kind.__name__,
- name=self._obj_name,
- )
- if self._obj_namespace:
- log = log.bind(namespace=self._obj_namespace)
- return log
-
- def generate_object(self, *args, **kwargs):
- raise NotImplementedError
-
- def update_object(self, resource: pykube.objects.APIObject, *args, **kwargs):
- raise NotImplementedError
-
- def ensure_object(self, resource: pykube.objects.APIObject, *args, **kwargs):
- self.logger.debug("Ensuring resource")
-
- if not resource.exists():
- self.logger.debug("Resource does not exist, creating")
- resource.create()
- else:
- resource.reload()
- self.update_object(resource, *args, **kwargs)
- resource.update()
-
- self.logger.info("Ensured resource")
-
- return {
- self.name: resource,
- }
-
- def execute(self, *args, **kwargs):
- resource = self.generate_object(*args, **kwargs)
- return self.ensure_object(resource, *args, **kwargs)
-
-
-class CreateOrUpdateNamespaceTask(CreateOrUpdateKubernetesObjectTask):
- def __init__(self, name: str, *args, **kwargs):
- super().__init__(
- pykube.Namespace,
- None,
- name,
- requires=set(["name"]),
- inject={"name": name},
- *args,
- **kwargs,
- )
-
- def generate_object(self, name, *args, **kwargs):
- return pykube.Namespace(
- self.api,
- {
- "apiVersion": "v1",
- "kind": "Namespace",
- "metadata": {"name": name},
- },
- )
-
- def update_object(self, resource: pykube.objects.APIObject, *args, **kwargs):
- pass
-
-
-class CreateOrUpdateServiceTask(CreateOrUpdateKubernetesObjectTask):
- def __init__(self, namespace: str, name: str, labels: dict, spec: dict):
- super().__init__(
- pykube.Service,
- namespace,
- name,
- requires=set(["namespace", "name", "labels", "spec"]),
- inject={"name": name, "labels": labels, "spec": spec},
- )
-
- def generate_object(
- self,
- namespace: pykube.Namespace,
- name: str,
- labels: dict,
- spec: dict,
- *args,
- **kwargs,
- ) -> pykube.Service:
- return pykube.Service(
- self.api,
- {
- "apiVersion": "v1",
- "kind": "Service",
- "metadata": {
- "name": name,
- "namespace": namespace.name,
- "labels": labels,
- },
- "spec": spec,
- },
- )
-
- def update_object(
- self,
- resource: pykube.Service,
- labels: dict,
- spec: dict,
- *args,
- **kwargs,
- ):
- resource.obj["metadata"]["labels"] = labels
- resource.obj["spec"] = spec
-
-
-class CreateOrUpdateSecretTask(CreateOrUpdateKubernetesObjectTask):
- def __init__(self, namespace: str, name: str, data: str, *args, **kwargs):
- super().__init__(
- pykube.Secret,
- namespace,
- name,
- requires=set(["namespace", "name", "data"]),
- inject={"name": name, "data": data},
- *args,
- **kwargs,
- )
-
- def generate_object(
- self, namespace: pykube.Namespace, name: str, data: dict, *args, **kwargs
- ):
- return pykube.Secret(
- self.api,
- {
- "apiVersion": "v1",
- "kind": "Secret",
- "metadata": {
- "name": name,
- "namespace": namespace.name,
- },
- "data": data,
- },
- )
-
- def update_object(
- self,
- resource: pykube.objects.APIObject,
- data: dict,
- *args,
- **kwargs,
- ):
- resource.obj["data"] = data
diff --git a/atmosphere/tasks/kubernetes/__init__.py b/atmosphere/tasks/kubernetes/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/atmosphere/tasks/kubernetes/__init__.py
diff --git a/atmosphere/tasks/kubernetes/base.py b/atmosphere/tasks/kubernetes/base.py
new file mode 100644
index 0000000..f4dc47a
--- /dev/null
+++ b/atmosphere/tasks/kubernetes/base.py
@@ -0,0 +1,78 @@
+import json
+import re
+
+import pykube
+from taskflow import task
+
+from atmosphere import clients, logger
+
+CAMEL_CASE_PATTERN = re.compile(r"(?<!^)(?=[A-Z])")
+LOG = logger.get_logger()
+
+
+class ApplyKubernetesObjectTask(task.Task):
+ def __init__(
+ self, kind: pykube.objects.APIObject, namespace: str, name: str, *args, **kwargs
+ ):
+ self._obj_kind = kind
+ self._obj_namespace = namespace
+ self._obj_name = name
+
+ kwargs["name"] = CAMEL_CASE_PATTERN.sub("-", kind.__name__).lower()
+ if namespace:
+ kwargs["name"] += f"-{namespace}"
+ kwargs["name"] += f"-{name}"
+
+ if namespace:
+ # kwargs.setdefault("requires", [])
+ # kwargs["requires"] += [f"namespace-{namespace}"]
+ kwargs.setdefault("rebind", {})
+ kwargs["rebind"]["namespace"] = f"namespace-{namespace}"
+
+ kwargs.setdefault("provides", set())
+ kwargs["provides"] = kwargs["provides"].union(set([kwargs["name"]]))
+
+ super().__init__(*args, **kwargs)
+
+ @property
+ def api(self):
+ return clients.get_pykube_api()
+
+ @property
+ def logger(self):
+ log = LOG.bind(
+ kind=self._obj_kind.__name__,
+ name=self._obj_name,
+ )
+ if self._obj_namespace:
+ log = log.bind(namespace=self._obj_namespace)
+ return log
+
+ def generate_object(self, *args, **kwargs) -> pykube.objects.APIObject:
+ raise NotImplementedError
+
+ def execute(self, *args, **kwargs):
+ self.logger.debug("Ensuring resource")
+
+ resource = self.generate_object(*args, **kwargs)
+ resp = resource.api.patch(
+ **resource.api_kwargs(
+ headers={
+ "Content-Type": "application/apply-patch+yaml",
+ },
+ params={
+ "fieldManager": "atmosphere-operator",
+ "force": True,
+ },
+ data=json.dumps(resource.obj),
+ )
+ )
+
+ resource.api.raise_for_status(resp)
+ resource.set_obj(resp.json())
+
+ self.logger.info("Ensured resource")
+
+ return {
+ self.name: resource,
+ }
diff --git a/atmosphere/tasks/flux.py b/atmosphere/tasks/kubernetes/flux.py
similarity index 91%
rename from atmosphere/tasks/flux.py
rename to atmosphere/tasks/kubernetes/flux.py
index 1e4941f..91ff59d 100644
--- a/atmosphere/tasks/flux.py
+++ b/atmosphere/tasks/kubernetes/flux.py
@@ -1,7 +1,7 @@
import pykube
from atmosphere import logger
-from atmosphere.tasks import kubernetes
+from atmosphere.tasks.kubernetes import base
LOG = logger.get_logger()
@@ -12,7 +12,7 @@
kind = "HelmRepository"
-class CreateOrUpdateHelmRepositoryTask(kubernetes.CreateOrUpdateKubernetesObjectTask):
+class ApplyHelmRepositoryTask(base.ApplyKubernetesObjectTask):
def __init__(self, namespace: str, name: str, url: str, *args, **kwargs):
super().__init__(
HelmRepository,
@@ -43,11 +43,6 @@
},
)
- def update_object(
- self, resource: pykube.objects.APIObject, url: str, *args, **kwargs
- ):
- resource.obj["spec"]["url"] = url
-
class HelmRelease(pykube.objects.NamespacedAPIObject):
version = "helm.toolkit.fluxcd.io/v2beta1"
@@ -55,7 +50,7 @@
kind = "HelmRelease"
-class CreateOrUpdateHelmReleaseTask(kubernetes.CreateOrUpdateKubernetesObjectTask):
+class ApplyHelmReleaseTask(base.ApplyKubernetesObjectTask):
def __init__(
self,
namespace: str,
diff --git a/atmosphere/tasks/kubernetes/v1.py b/atmosphere/tasks/kubernetes/v1.py
new file mode 100644
index 0000000..d183d0c
--- /dev/null
+++ b/atmosphere/tasks/kubernetes/v1.py
@@ -0,0 +1,92 @@
+import pykube
+
+from atmosphere import logger
+from atmosphere.tasks.kubernetes import base
+
+LOG = logger.get_logger()
+
+
+class ApplyNamespaceTask(base.ApplyKubernetesObjectTask):
+ def __init__(self, name: str, *args, **kwargs):
+ super().__init__(
+ pykube.Namespace,
+ None,
+ name,
+ requires=set(["name"]),
+ inject={"name": name},
+ *args,
+ **kwargs,
+ )
+
+ def generate_object(self, name, *args, **kwargs):
+ return pykube.Namespace(
+ self.api,
+ {
+ "apiVersion": "v1",
+ "kind": "Namespace",
+ "metadata": {"name": name},
+ },
+ )
+
+
+class ApplyServiceTask(base.ApplyKubernetesObjectTask):
+ def __init__(self, namespace: str, name: str, labels: dict, spec: dict):
+ super().__init__(
+ pykube.Service,
+ namespace,
+ name,
+ requires=set(["namespace", "name", "labels", "spec"]),
+ inject={"name": name, "labels": labels, "spec": spec},
+ )
+
+ def generate_object(
+ self,
+ namespace: pykube.Namespace,
+ name: str,
+ labels: dict,
+ spec: dict,
+ *args,
+ **kwargs,
+ ) -> pykube.Service:
+ return pykube.Service(
+ self.api,
+ {
+ "apiVersion": "v1",
+ "kind": "Service",
+ "metadata": {
+ "name": name,
+ "namespace": namespace.name,
+ "labels": labels,
+ },
+ "spec": spec,
+ },
+ )
+
+
+class ApplySecretTask(base.ApplyKubernetesObjectTask):
+ def __init__(self, namespace: str, name: str, data: str, *args, **kwargs):
+ super().__init__(
+ pykube.Secret,
+ namespace,
+ name,
+ requires=set(["namespace", "name", "data"]),
+ inject={"name": name, "data": data},
+ *args,
+ **kwargs,
+ )
+
+ def generate_object(
+ self, namespace: pykube.Namespace, name: str, data: dict, *args, **kwargs
+ ):
+ return pykube.Secret(
+ self.api,
+ {
+ "apiVersion": "v1",
+ "kind": "Secret",
+ "metadata": {
+ "name": name,
+ "namespace": namespace.name,
+ },
+ "data": data,
+ },
+ )