test: fix e2e tests
diff --git a/.gitignore b/.gitignore
index c56d48e..8ee9b25 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,3 +13,4 @@
 __pycache__
 .pytest*
 .coverage
+.kube
diff --git a/atmosphere/cmd/operator.py b/atmosphere/cmd/operator.py
index 0cde95d..2725a92 100644
--- a/atmosphere/cmd/operator.py
+++ b/atmosphere/cmd/operator.py
@@ -20,14 +20,14 @@
         for c in config._root_config:
             group = conf.get(c.name)
             setattr(CONF, c.name, group)
-        engine = taskflow.engines.load(flows.DEPLOY)
+        engine = taskflow.engines.load(flows.get_deployment_flow())
         engine.run()
 
 
 def main():
     LOG.info("Starting Atmosphere operator")
 
-    engine = taskflow.engines.load(flows.DEPLOY)
+    engine = taskflow.engines.load(flows.get_deployment_flow())
     engine.run()
     LOG.info("Atmosphere operator successfully started")
 
diff --git a/atmosphere/flows.py b/atmosphere/flows.py
index 6c424eb..f933700 100644
--- a/atmosphere/flows.py
+++ b/atmosphere/flows.py
@@ -90,6 +90,3 @@
         )
 
     return flow
-
-
-DEPLOY = get_deployment_flow()
diff --git a/atmosphere/tasks/flux.py b/atmosphere/tasks/flux.py
index ec6fdc0..e9b7f52 100644
--- a/atmosphere/tasks/flux.py
+++ b/atmosphere/tasks/flux.py
@@ -24,7 +24,9 @@
             **kwargs
         )
 
-    def generate_object(self, namespace, name, url, *args, **kwargs):
+    def generate_object(
+        self, namespace: pykube.Namespace, name: str, url: str, *args, **kwargs
+    ):
         return HelmRepository(
             self.api,
             {
@@ -40,3 +42,8 @@
                 },
             },
         )
+
+    def update_object(
+        self, resource: pykube.objects.APIObject, url: str, *args, **kwargs
+    ):
+        resource.obj["spec"]["url"] = url
diff --git a/atmosphere/tasks/kubernetes.py b/atmosphere/tasks/kubernetes.py
index a5adbe4..8e790ec 100644
--- a/atmosphere/tasks/kubernetes.py
+++ b/atmosphere/tasks/kubernetes.py
@@ -50,13 +50,18 @@
     def generate_object(self, *args, **kwargs):
         raise NotImplementedError
 
-    def ensure_object(self, resource, *args, **kwargs):
+    def update_object(self, resource: pykube.objects.APIObject, *args, **kwargs):
+        raise NotImplementedError
+
+    def ensure_object(self, resource: pykube.objects.APIObject, *args, **kwargs):
         self.logger.debug("Ensuring resource")
 
         if not resource.exists():
             self.logger.debug("Resource does not exist, creating")
             resource.create()
         else:
+            resource.reload()
+            self.update_object(resource, *args, **kwargs)
             resource.update()
 
         self.logger.info("Ensured resource")
@@ -67,7 +72,7 @@
 
     def execute(self, *args, **kwargs):
         resource = self.generate_object(*args, **kwargs)
-        return self.ensure_object(resource)
+        return self.ensure_object(resource, *args, **kwargs)
 
 
 class CreateOrUpdateNamespaceTask(CreateOrUpdateKubernetesObjectTask):
@@ -92,6 +97,9 @@
             },
         )
 
+    def update_object(self, resource: pykube.objects.APIObject, *args, **kwargs):
+        pass
+
 
 class CreateOrUpdateSecretTask(CreateOrUpdateKubernetesObjectTask):
     def __init__(self, namespace: str, name: str, data: str, *args, **kwargs):
@@ -105,7 +113,9 @@
             **kwargs,
         )
 
-    def generate_object(self, namespace, name, data, *args, **kwargs):
+    def generate_object(
+        self, namespace: pykube.Namespace, name: str, data: dict, *args, **kwargs
+    ):
         return pykube.Secret(
             self.api,
             {
@@ -118,3 +128,12 @@
                 "data": data,
             },
         )
+
+    def update_object(
+        self,
+        resource: pykube.objects.APIObject,
+        data: dict,
+        *args,
+        **kwargs,
+    ):
+        resource.obj["data"] = data
diff --git a/atmosphere/tests/conftest.py b/atmosphere/tests/conftest.py
index 26818b1..a678d84 100644
--- a/atmosphere/tests/conftest.py
+++ b/atmosphere/tests/conftest.py
@@ -1,4 +1,8 @@
+import pathlib
+import subprocess
+
 import pytest
+import requests
 
 
 @pytest.fixture
@@ -6,3 +10,40 @@
     mocked_api = mocker.MagicMock()
     mocker.patch("atmosphere.clients.get_pykube_api", return_value=mocked_api)
     return mocked_api
+
+
+@pytest.fixture(scope="session")
+def flux_cluster(kind_cluster):
+    path = pathlib.Path(".pytest-flux")
+    path.mkdir(parents=True, exist_ok=True)
+
+    install_path = path / "install.sh"
+    flux_path = path / "flux"
+
+    if not install_path.exists():
+        url = "https://fluxcd.io/install.sh"
+        tmp_file = install_path.with_suffix(".tmp")
+        with requests.get(url, stream=True) as r:
+            r.raise_for_status()
+            with tmp_file.open("wb") as fd:
+                for chunk in r.iter_content(chunk_size=8192):
+                    if chunk:
+                        fd.write(chunk)
+        tmp_file.chmod(0o755)
+        tmp_file.rename(install_path)
+
+    if not flux_path.exists():
+        subprocess.check_output(
+            ["bash", install_path, path],
+            encoding="utf-8",
+        )
+
+    assert flux_path.exists() is True
+
+    subprocess.check_output(
+        [flux_path, "install"],
+        env={"KUBECONFIG": str(kind_cluster.kubeconfig_path)},
+        encoding="utf-8",
+    )
+
+    return kind_cluster
diff --git a/atmosphere/tests/e2e/test_operator.py b/atmosphere/tests/e2e/test_operator.py
index 1c5e728..a014df4 100644
--- a/atmosphere/tests/e2e/test_operator.py
+++ b/atmosphere/tests/e2e/test_operator.py
@@ -18,9 +18,9 @@
     return image
 
 
-def test_e2e_for_operator(tmp_path, kind_cluster, docker_image):
-    kind_cluster.load_docker_image(docker_image)
-    kind_cluster.kubectl(
+def test_e2e_for_operator(tmp_path, flux_cluster, docker_image):
+    flux_cluster.load_docker_image(docker_image)
+    flux_cluster.kubectl(
         "label", "node", "pytest-kind-control-plane", "openstack-control-plane=enabled"
     )
 
@@ -45,7 +45,7 @@
     file = tmp_path / "namespace.yml"
     template = env.get_template("namespace.yml")
     file.write_text(template.render(**args))
-    kind_cluster.kubectl("apply", "-f", file)
+    flux_cluster.kubectl("apply", "-f", file)
 
     for manifest in glob.glob("roles/atmosphere/templates/*.yml"):
         filename = posixpath.basename(manifest)
@@ -54,13 +54,13 @@
         file = tmp_path / filename
         file.write_text(template.render(**args))
 
-        kind_cluster.kubectl("apply", "-f", file)
+        flux_cluster.kubectl("apply", "-f", file)
 
-    kind_cluster.kubectl(
+    flux_cluster.kubectl(
         "-n", "openstack", "rollout", "status", "deployment/atmosphere-operator"
     )
 
-    for pod in pykube.Pod.objects(kind_cluster.api, namespace="openstack").filter(
+    for pod in pykube.Pod.objects(flux_cluster.api, namespace="openstack").filter(
         selector="application=atmosphere"
     ):
         for attempt in Retrying(
@@ -71,6 +71,6 @@
 
     for secret_name in ["atmosphere-config", "atmosphere-memcached"]:
         secret = pykube.Secret.objects(
-            kind_cluster.api, namespace="openstack"
+            flux_cluster.api, namespace="openstack"
         ).get_by_name(secret_name)
         assert secret.exists()
diff --git a/atmosphere/tests/integration/test_deploy.py b/atmosphere/tests/integration/test_deploy.py
index 5088b86..8b9c5fa 100644
--- a/atmosphere/tests/integration/test_deploy.py
+++ b/atmosphere/tests/integration/test_deploy.py
@@ -6,20 +6,20 @@
 from atmosphere.config import CONF
 
 
-def test_kubernetes_version(kind_cluster):
-    assert kind_cluster.api.version == ("1", "25")
+def test_kubernetes_version(flux_cluster):
+    assert flux_cluster.api.version == ("1", "25")
 
 
-def test_deployment(mocker, kind_cluster):
-    mocker.patch("atmosphere.clients.get_pykube_api", return_value=kind_cluster.api)
+def test_deployment(mocker, flux_cluster):
+    mocker.patch("atmosphere.clients.get_pykube_api", return_value=flux_cluster.api)
 
-    kind_cluster.kubectl("create", "namespace", "openstack")
+    flux_cluster.kubectl("create", "namespace", "openstack")
 
-    engine = taskflow.engines.load(flows.DEPLOY)
+    engine = taskflow.engines.load(flows.get_deployment_flow())
     engine.run()
 
     initial_memcache_secret = pykube.Secret.objects(
-        kind_cluster.api, namespace="openstack"
+        flux_cluster.api, namespace="openstack"
     ).get_by_name("atmosphere-memcached")
     assert initial_memcache_secret.exists()
 
@@ -31,11 +31,11 @@
             ],
         },
     ):
-        engine = taskflow.engines.load(flows.DEPLOY)
+        engine = taskflow.engines.load(flows.get_deployment_flow())
         engine.run()
 
     updated_memcache_secret = pykube.Secret.objects(
-        kind_cluster.api, namespace="openstack"
+        flux_cluster.api, namespace="openstack"
     ).get_by_name("atmosphere-memcached")
     assert updated_memcache_secret.exists()