Merge remote-tracking branch 'origin/main' into update-openstacksdk
diff --git a/etc/staffeln/staffeln.conf b/etc/staffeln/staffeln.conf
index 5b8210c..19a214a 100644
--- a/etc/staffeln/staffeln.conf
+++ b/etc/staffeln/staffeln.conf
@@ -1,6 +1,12 @@
 [conductor]
-backup_peroid = 20
-workers = 1
+backup_workers = 1
+rotation_workers = 1
+backup_service_period = 20
+retention_service_period = 20
+# 1y2mon10d5h30min10s
+backup_cycle_timout = 5min
+retention_time = 2w3d
+backup_metadata_key="__automated_backup"
 
 [database]
 backend = sqlalchemy
@@ -14,9 +20,14 @@
 # retry_interval = 10
 
 [api]
-; host = 0.0.0.0
-; port = 8808
+# host = 0.0.0.0
+# port = 8808
 # enabled_ssl = false
 # ca_file = <None>
 # ssl_cert_file = <None>
 # ssl_key_file = <None>
+
+[notification]
+# receiver = reciever@gmail.com
+# sender_email = sender@vexxhost.com
+# smtp_server_domain = localhost
\ No newline at end of file
diff --git a/etc/systemd/staffeln-api.service b/etc/systemd/staffeln-api.service
new file mode 100644
index 0000000..45005ec
--- /dev/null
+++ b/etc/systemd/staffeln-api.service
@@ -0,0 +1,33 @@
+[Unit]
+Description = staffeln api  service
+After = cinder-backup.service
+After = syslog.target
+
+[Service]
+Type = simple
+User = cgm
+Group = cgm
+ExecStart = /home/cgm/cgm/bin/staffeln-api
+ExecReload = /bin/kill -HUP $MAINPID
+# Give a reasonable amount of time for the server to start up/shut down
+TimeoutSec = 120
+Restart = on-failure
+RestartSec = 2
+# This creates a specific slice which all services will operate from
+#  The accounting options give us the ability to see resource usage through
+#  the `systemd-cgtop` command.
+Slice = staffeln.slice
+# Set Accounting
+CPUAccounting = True
+BlockIOAccounting = True
+MemoryAccounting = True
+TasksAccounting = True
+# Set Sandboxing
+PrivateTmp = False
+PrivateDevices = False
+PrivateNetwork = False
+PrivateUsers = False
+Environment = PATH=/home/cgm/cgm/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
+
+[Install]
+WantedBy = multi-user.target
diff --git a/etc/systemd/staffeln-conductor.service b/etc/systemd/staffeln-conductor.service
new file mode 100644
index 0000000..3bd78d6
--- /dev/null
+++ b/etc/systemd/staffeln-conductor.service
@@ -0,0 +1,34 @@
+[Unit]
+Description = staffeln conductor  service
+After = cinder-backup.service
+After = syslog.target
+
+[Service]
+Type = simple
+User = cgm
+Group = cgm
+ExecStart = /home/cgm/cgm/bin/staffeln-conductor
+EnvironmentFile=/home/cgm/openrc
+ExecReload = /bin/kill -HUP $MAINPID
+# Give a reasonable amount of time for the server to start up/shut down
+TimeoutSec = 120
+Restart = on-failure
+RestartSec = 2
+# This creates a specific slice which all services will operate from
+#  The accounting options give us the ability to see resource usage through
+#  the `systemd-cgtop` command.
+Slice = staffeln.slice
+# Set Accounting
+CPUAccounting = True
+BlockIOAccounting = True
+MemoryAccounting = True
+TasksAccounting = True
+# Set Sandboxing
+PrivateTmp = False
+PrivateDevices = False
+PrivateNetwork = False
+PrivateUsers = False
+Environment = PATH=/home/cgm/cgm/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
+
+[Install]
+WantedBy = multi-user.target
diff --git a/staffeln/api/app.py b/staffeln/api/app.py
index 80c1bb2..9552746 100755
--- a/staffeln/api/app.py
+++ b/staffeln/api/app.py
@@ -3,7 +3,6 @@
 from flask import request

 from staffeln import objects

 from staffeln.common import context

-from staffeln.common import openstack

 from oslo_log import log

 

 

@@ -15,18 +14,13 @@
 

 @app.route("/v1/backup", methods=["POST"])

 def backup_id():

-    openstacksdk = openstack.OpenstackSDK()

-    retention_user_id = openstacksdk.get_user_id()

     

-    if not "user_id" in request.args or not "backup_id" in request.args:

+    if "backup_id" not in request.args:

         # Return error if the backup_id argument is not provided.

         return Response(

-            "Error: backup_id or user_id is missing.", status=403, mimetype="text/plain"

+            "Error: backup_id is missing.", status=403, mimetype="text/plain"

         )

 

-    if retention_user_id == request.args["user_id"]:

-        return Response("True", status=200, mimetype="text/plain")

-

     # Retrive the backup object from backup_data table with matching backup_id.

     backup = objects.Volume.get_backup_by_backup_id(ctx, request.args["backup_id"])

     # backup_info is None when there is no entry of the backup id in backup_table.

diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index 26cd159..2550d88 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -16,7 +16,7 @@
     sm = cotyledon.ServiceManager()

     sm.add(manager.BackupManager,

            workers=CONF.conductor.backup_workers, args=(CONF,))

-    # sm.add(manager.RotationManager,

-    #        workers=CONF.conductor.rotation_workers, args=(CONF,))

+    sm.add(manager.RotationManager,

+           workers=CONF.conductor.rotation_workers, args=(CONF,))

     oslo_config_glue.setup(sm, CONF)

     sm.run()

diff --git a/staffeln/common/email.py b/staffeln/common/email.py
index 86ebdd2..d88a96d 100644
--- a/staffeln/common/email.py
+++ b/staffeln/common/email.py
@@ -4,7 +4,7 @@
 from email.mime.text import MIMEText

 from email.mime.multipart import MIMEMultipart

 

-__DRY_RUN__ = True

+__DRY_RUN__ = False

 

 

 def send(

diff --git a/staffeln/common/openstack.py b/staffeln/common/openstack.py
index 298a9f0..7aaf1a0 100644
--- a/staffeln/common/openstack.py
+++ b/staffeln/common/openstack.py
@@ -1,6 +1,10 @@
 from openstack import exceptions

 from openstack import proxy

+from oslo_log import log

 from staffeln.common import auth

+from staffeln.i18n import _

+

+LOG = log.getLogger(__name__)

 

 

 class OpenstackSDK():

@@ -11,13 +15,16 @@
 

 

     def set_project(self, project):

+        LOG.debug(_("Connect as project %s" % project.get('name')))

         project_id = project.get('id')

 

-        if project_id in self.conn_list:

-            self.conn = self.conn_list[project_id]

-        else:

+        if project_id not in self.conn_list:

+            LOG.debug(_("Initiate connection for project %s" % project.get('name')))

             conn = self.conn.connect_as_project(project)

-            self.conn = conn

+            self.conn_list[project_id] = conn

+        LOG.debug(_("Connect as project %s" % project.get('name')))

+        self.conn = self.conn_list[project_id]

+

 

     # user

     def get_user_id(self):

@@ -70,7 +77,7 @@
         )

 

 

-    def delete_backup(self, uuid, project_id=None, force=True):

+    def delete_backup(self, uuid, project_id=None, force=False):

         # Note(Alex): v3 is not supporting force delete?

         # conn.block_storage.delete_backup(

         #     project_id=project_id, backup_id=uuid,

diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 2d917a9..bbfd7c7 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -5,6 +5,7 @@
 from staffeln.conductor import result
 from openstack.exceptions import ResourceNotFound as OpenstackResourceNotFound
 from openstack.exceptions import SDKException as OpenstackSDKException
+from openstack.exceptions import HttpException as OpenstackHttpException
 from oslo_log import log
 from staffeln.common import context
 from staffeln import objects
@@ -23,10 +24,17 @@
 )
 
 
-def check_vm_backup_metadata(metadata):
-    if not CONF.conductor.backup_metadata_key in metadata:
-        return False
-    return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+def retry_auth(func):
+    """Decorator to reconnect openstack and avoid token rotation"""
+    def wrapper(self, *args, **kwargs):
+        try:
+            return func(self, *args, **kwargs)
+        except OpenstackHttpException as ex:
+            if ex.status_code == 403:
+                LOG.warn(_("Token has been expired or rotated!"))
+                self.refresh_openstacksdk()
+                return func(self, *args, **kwargs)
+    return wrapper
 
 
 class Backup(object):
@@ -35,12 +43,18 @@
     def __init__(self):
         self.ctx = context.make_context()
         self.result = result.BackupResult()
-        self.openstacksdk = openstack.OpenstackSDK()
+        self.refresh_openstacksdk()
         self.project_list = {}
 
+    def refresh_openstacksdk(self):
+        self.openstacksdk = openstack.OpenstackSDK()
+
     def publish_backup_result(self):
         self.result.publish()
 
+    def refresh_backup_result(self):
+        self.result.initialize()
+
     def get_backups(self, filters=None):
         return objects.Volume.list(self.ctx, filters=filters)
 
@@ -68,11 +82,13 @@
 
     # Backup the volumes attached to which has a specific metadata
     def filter_by_server_metadata(self, metadata):
+        if CONF.conductor.backup_metadata_key is not None:
+            if not CONF.conductor.backup_metadata_key in metadata:
+                return False
 
-        if not CONF.conductor.backup_metadata_key in metadata:
-            return False
-
-        return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+            return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+        else:
+            return True
 
     # Backup the volumes in in-use and available status
     def filter_by_volume_status(self, volume_id, project_id):
@@ -89,7 +105,6 @@
         except OpenstackResourceNotFound:
             return False
 
-
     #  delete all backups forcily regardless of the status
     def hard_cancel_backup_task(self, task):
         try:
@@ -150,8 +165,13 @@
     #  delete all backups forcily regardless of the status
     def hard_remove_volume_backup(self, backup_object):
         try:
+            project_id = backup_object.project_id
+            if project_id not in self.project_list:
+                backup_object.delete_backup()
+
+            self.openstacksdk.set_project(self.project_list[project_id])
             backup = self.openstacksdk.get_backup(uuid=backup_object.backup_id,
-                                             project_id=backup_object.project_id)
+                                                  project_id=project_id)
             if backup == None:
                 LOG.info(_("Backup %s is not existing in Openstack."
                            "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
@@ -170,6 +190,12 @@
             # remove from the backup table
             backup_object.delete_backup()
 
+    def update_project_list(self):
+        projects = self.openstacksdk.get_projects()
+        for project in projects:
+            self.project_list[project.id] = project
+
+    @retry_auth
     def check_instance_volumes(self):
         """Get the list of all the volumes from the project using openstacksdk
         Function first list all the servers in the project and get the volumes
@@ -227,10 +253,14 @@
                 LOG.info(_("Backup for volume %s creating in project %s"
                            % (queue.volume_id, project_id)))
                 # NOTE(Alex): no need to wait because we have a cycle time out
-                if project_id not in self.project_list: self.process_non_existing_backup(queue)
+                if project_id not in self.project_list:
+                    LOG.info(_("Project ID %s is not existing in project list"
+                               % project_id))
+                    self.process_non_existing_backup(queue)
+                    return
                 self.openstacksdk.set_project(self.project_list[project_id])
                 volume_backup = self.openstacksdk.create_backup(volume_id=queue.volume_id,
-                                                           project_id=project_id)
+                                                                project_id=project_id)
                 queue.backup_id = volume_backup.id
                 queue.backup_status = constants.BACKUP_WIP
                 queue.save()
@@ -244,6 +274,17 @@
                     queue.backup_id = parsed["id"]
                 queue.backup_status = constants.BACKUP_WIP
                 queue.save()
+            # Added extra exception as OpenstackSDKException does not handle the keystone unauthourized issue.
+            except Exception as error:
+                reason = _("Backup creation for the volume %s failled. %s"
+                           % (queue.volume_id, str(error)))
+                LOG.error(reason)
+                self.result.add_failed_backup(project_id, queue.volume_id, reason)
+                parsed = parse.parse("Error in creating volume backup {id}", str(error))
+                if parsed is not None:
+                    queue.backup_id = parsed["id"]
+                queue.backup_status = constants.BACKUP_WIP
+                queue.save()
         else:
             pass
             # TODO(Alex): remove this task from the task list
@@ -251,7 +292,6 @@
             #  Reserve for now because it is related to the WIP backup genenrators which
             #  are not finished in the current cycle
 
-
     # backup gen was not created
     def process_pre_failed_backup(self, task):
         # 1.notify via email
@@ -268,7 +308,7 @@
         self.result.add_failed_backup(task.project_id, task.volume_id, reason)
         LOG.error(reason)
         # 2. delete backup generator
-        self.openstacksdk.delete_backup(uuid=task.backup_id)
+        self.openstacksdk.delete_backup(uuid=task.backup_id, force=True)
         # 3. remove failed task from the task queue
         task.delete_queue()
 
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index e77ef8e..5388df5 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -28,13 +28,7 @@
 

     def run(self):

         LOG.info("%s run" % self.name)

-        periodic_callables = [

-            (self.backup_engine, (), {}),

-        ]

-        periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")

-        periodic_thread = threading.Thread(target=periodic_worker.start)

-        periodic_thread.daemon = True

-        periodic_thread.start()

+        self.backup_engine(CONF.conductor.backup_service_period)

 

     def terminate(self):

         LOG.info("%s terminate" % self.name)

@@ -103,22 +97,32 @@
     # Refresh the task queue

     def _update_task_queue(self):

         LOG.info(_("Updating backup task queue..."))

+        self.controller.refresh_openstacksdk()

+        self.controller.refresh_backup_result()

         current_tasks = self.controller.get_queues()

         self.controller.create_queue(current_tasks)

 

     def _report_backup_result(self):

         self.controller.publish_backup_result()

 

-    @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)

-    def backup_engine(self):

+    def backup_engine(self, backup_service_period):

         LOG.info("backing... %s" % str(time.time()))

         LOG.info("%s periodics" % self.name)

 

-        self._update_task_queue()

-        self._process_todo_tasks()

-        self._process_wip_tasks()

-        self._report_backup_result()

+        @periodics.periodic(spacing=backup_service_period, run_immediately=True)

+        def backup_tasks():

+            self._update_task_queue()

+            self._process_todo_tasks()

+            self._process_wip_tasks()

+            self._report_backup_result()

 

+        periodic_callables = [

+            (backup_tasks, (), {}),

+        ]

+        periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")

+        periodic_thread = threading.Thread(target=periodic_worker.start)

+        periodic_thread.daemon = True

+        periodic_thread.start()

 

 class RotationManager(cotyledon.Service):

     name = "Staffeln conductor rotation controller"

@@ -132,14 +136,7 @@
 

     def run(self):

         LOG.info("%s run" % self.name)

-

-        periodic_callables = [

-            (self.rotation_engine, (), {}),

-        ]

-        periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")

-        periodic_thread = threading.Thread(target=periodic_worker.start)

-        periodic_thread.daemon = True

-        periodic_thread.start()

+        self.rotation_engine(CONF.conductor.retention_service_period)

 

     def terminate(self):

         LOG.info("%s terminate" % self.name)

@@ -159,14 +156,25 @@
         for retention_backup in self.backup_list:

             self.controller.hard_remove_volume_backup(retention_backup)

 

-    @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)

-    def rotation_engine(self):

+    def rotation_engine(self, retention_service_period):

         LOG.info("%s rotation_engine" % self.name)

-        # 1. get the list of backups to remove based on the retention time

-        if not self.get_backup_list(): return

 

-        # 2. remove the backups

-        self.remove_backups()

+        @periodics.periodic(spacing=retention_service_period, run_immediately=True)

+        def rotation_tasks():

+            self.controller.refresh_openstacksdk()

+            # 1. get the list of backups to remove based on the retention time

+            if not self.get_backup_list(): return

+            # 2. get project list

+            self.controller.update_project_list()

+            # 3. remove the backups

+            self.remove_backups()

+        periodic_callables = [

+            (rotation_tasks, (), {}),

+        ]

+        periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")

+        periodic_thread = threading.Thread(target=periodic_worker.start)

+        periodic_thread.daemon = True

+        periodic_thread.start()

 

     # get the threshold time str

     def get_threshold_strtime(self):

diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index a76d99e..b16bcc6 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -18,6 +18,9 @@
 class BackupResult(object):

 

     def __init__(self):

+        pass

+

+    def initialize(self):

         self.content = ""

         self.project_list = []

         self.success_backup_list = {}

@@ -53,6 +56,7 @@
     def send_result_email(self):

         subject = "Backup result"

         try:

+            if len(CONF.notification.receiver) == 0: return

             email.send(

                 src_email=CONF.notification.sender_email,

                 src_pwd=CONF.notification.sender_pwd,

diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index d5e3861..69f7fe2 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -30,7 +30,6 @@
     ),
     cfg.StrOpt(
         "backup_metadata_key",
-        default="__automated_backup",
         help=_("The key string of metadata the VM, which requres back up, has"),
     ),
 ]