Add remove volume backup logic
diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index d3d0412..26cd159 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -7,7 +7,6 @@
 from staffeln.conductor import manager

 import staffeln.conf

 

-

 CONF = staffeln.conf.CONF

 

 

@@ -15,7 +14,8 @@
     service.prepare_service()

 

     sm = cotyledon.ServiceManager()

-    sm.add(manager.BackupManager, workers=CONF.conductor.backup_workers, args=(CONF,))

+    sm.add(manager.BackupManager,

+           workers=CONF.conductor.backup_workers, args=(CONF,))

     # sm.add(manager.RotationManager,

     #        workers=CONF.conductor.rotation_workers, args=(CONF,))

     oslo_config_glue.setup(sm, CONF)

diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index d537d48..6de73f6 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -2,4 +2,6 @@
 BACKUP_WIP=1

 BACKUP_PLANNED=0

 

-BACKUP_ENABLED_KEY = 'true'
\ No newline at end of file
+BACKUP_ENABLED_KEY = 'true'

+

+DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
\ No newline at end of file
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index e934063..ada7faa 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,214 +1,227 @@
-import staffeln.conf

-import collections

-from staffeln.common import constants

-

-from openstack import exceptions

-from oslo_log import log

-from staffeln.common import auth

-from staffeln.common import context

-from staffeln import objects

-from staffeln.i18n import _

-

-CONF = staffeln.conf.CONF

-LOG = log.getLogger(__name__)

-

-BackupMapping = collections.namedtuple(

-    "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]

-)

-

-QueueMapping = collections.namedtuple(

-    "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]

-)

-

-conn = auth.create_connection()

-

-

-def check_vm_backup_metadata(metadata):

-    if not CONF.conductor.backup_metadata_key in metadata:

-        return False

-    return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]

-

-

-def get_projects_list():

-    projects = conn.list_projects()

-    return projects

-

-

-class Backup(object):

-    """Implmentations of the queue with the sql."""

-

-    def __init__(self):

-        self.ctx = context.make_context()

-        self.discovered_backup_map = None

-        self.queue_mapping = dict()

-        self.volume_mapping = dict()

-

-    def get_backups(self, filters=None):

-        return objects.Volume.list(self.ctx, filters=filters)

-

-    def get_queues(self, filters=None):

-        """Get the list of volume queue columns from the queue_data table"""

-        queues = objects.Queue.list(self.ctx, filters=filters)

-        return queues

-

-    def create_queue(self):

-        """Create the queue of all the volumes for backup"""

-        queue_list = self.check_instance_volumes()

-        for queue in queue_list:

-            self._volume_queue(queue)

-

-    # Backup the volumes attached to which has a specific metadata

-    def filter_server(self, metadata):

-

-        if not CONF.conductor.backup_metadata_key in metadata:

-            return False

-

-        return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY

-

-    # Backup the volumes in in-use and available status

-    def filter_volume(self, volume_id):

-        volume = conn.get_volume_by_id(volume_id)

-        res = volume['status'] in ("available", "in-use")

-        if not res:

-            LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))

-

-    def check_backup_status(self, backup_object):

-        try:

-            backup = conn.get_volume_backup(backup_object.backup_id)

-            # TODO(Alex): "deleting", "restoring", "error_restoring", "error"

-            res = backup["status"] in ("available")

-            if not res:

-                LOG.info(_("Rotation for the backup %s is skipped "

-                           "because it is in %s status")%(backup_object.backup_id, backup["status"]))

-            return res

-        except exceptions.ResourceNotFound:

-            LOG.info(_("Backup % is not existing." % backup_object.backup_id))

-            # remove from the backup table

-            backup_object.delete_backup()

-

-            return False

-

-

-    def check_instance_volumes(self):

-        """Get the list of all the volumes from the project using openstacksdk

-        Function first list all the servers in the project and get the volumes

-        that are attached to the instance.

-        """

-        queues_map = []

-        projects = get_projects_list()

-        for project in projects:

-            servers = conn.compute.servers(

-                details=True, all_projects=True, project_id=project.id

-            )

-            for server in servers:

-                if not self.filter_server(server.metadata): continue

-                for volume in server.attached_volumes:

-                    if not self.filter_volume(volume["id"]): continue

-                    queues_map.append(

-                        QueueMapping(

-                            volume_id=volume["id"],

-                            backup_id="NULL",

-                            instance_id=server.id,

-                            backup_status=constants.BACKUP_PLANNED,

-                        )

-                    )

-        return queues_map

-

-    def _volume_queue(self, task):

-        """Saves the queue data to the database."""

-

-        # TODO(Alex): Need to escalate discussion

-        # When create the task list, need to check the WIP backup generators

-        # which are created in the past backup cycle.

-        # Then skip to create new tasks for the volumes whose backup is WIP

-        volume_queue = objects.Queue(self.ctx)

-        volume_queue.backup_id = task.backup_id

-        volume_queue.volume_id = task.volume_id

-        volume_queue.instance_id = task.instance_id

-        volume_queue.backup_status = task.backup_status

-        volume_queue.create()

-

-    def volume_backup_initiate(self, queue):

-        """Initiate the backup of the volume

-        :params: queue: Provide the map of the volume that needs

-                  backup.

-        This function will call the backupup api and change the

-        backup_status and backup_id in the queue table.

-        """

-        backup_id = queue.backup_id

-        if backup_id == "NULL":

-            try:

-                volume_backup = conn.block_storage.create_backup(

-                    volume_id=queue.volume_id, force=True

-                )

-                queue.backup_id = volume_backup.id

-                queue.backup_status = constants.BACKUP_WIP

-                queue.save()

-            except exceptions as error:

-                print(error)

-        else:

-            pass

-            # TODO(Alex): remove this task from the task list

-            #  Backup planned task cannot have backup_id in the same cycle

-            #  Reserve for now because it is related to the WIP backup genenrators which

-            #  are not finished in the current cycle

-

-    def process_failed_backup(self, task):

-        # 1. TODO(Alex): notify via email

-        LOG.error("Backup of the volume %s failed." % task.volume_id)

-        # 2. TODO(Alex): remove failed backup instance from the openstack

-        #     then set the volume status in-use

-        # 3. remove failed task from the task queue

-        task.delete_queue()

-

-    def process_available_backup(self, task):

-        LOG.info("Backup of the volume %s is successful." % task.volume_id)

-        # 1. save success backup in the backup table

-        self._volume_backup(

-            BackupMapping(

-                volume_id=task.volume_id,

-                backup_id=task.backup_id,

-                instance_id=task.instance_id,

-                backup_completed=1,

-            )

-        )

-        # 2. remove from the task list

-        task.delete_queue()

-        # 3. TODO(Alex): notify via email

-

-    def process_using_backup(self, task):

-        # remove from the task list

-        task.delete_queue()

-

-    def check_volume_backup_status(self, queue):

-        """Checks the backup status of the volume

-        :params: queue: Provide the map of the volume that needs backup

-                 status checked.

-        Call the backups api to see if the backup is successful.

-        """

-        for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):

-            if backup_gen.id == queue.backup_id:

-                if backup_gen.status == "error":

-                    self.process_failed_backup(queue)

-                elif backup_gen.status == "available":

-                    self.process_available_backup(queue)

-                elif backup_gen.status == "creating":

-                    # TODO(Alex): Need to escalate discussion

-                    # How to proceed WIP bakcup generators?

-                    # To make things worse, the last backup generator is in progress till

-                    # the new backup cycle

-                    LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)

-                else:  # "deleting", "restoring", "error_restoring" status

-                    self.process_using_backup(queue)

-

-    def _volume_backup(self, task):

-        # matching_backups = [

-        #     g for g in self.available_backups if g.backup_id == task.backup_id

-        # ]

-        # if not matching_backups:

-        volume_backup = objects.Volume(self.ctx)

-        volume_backup.backup_id = task.backup_id

-        volume_backup.volume_id = task.volume_id

-        volume_backup.instance_id = task.instance_id

-        volume_backup.backup_completed = task.backup_completed

-        volume_backup.create()

+import staffeln.conf
+import collections
+from staffeln.common import constants
+
+from openstack import exceptions
+from oslo_log import log
+from staffeln.common import auth
+from staffeln.common import context
+from staffeln import objects
+from staffeln.i18n import _
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+BackupMapping = collections.namedtuple(
+    "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+)
+
+QueueMapping = collections.namedtuple(
+    "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+)
+
+conn = auth.create_connection()
+
+
+def check_vm_backup_metadata(metadata):
+    if not CONF.conductor.backup_metadata_key in metadata:
+        return False
+    return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+
+
+def get_projects_list():
+    projects = conn.list_projects()
+    return projects
+
+
+class Backup(object):
+    """Implmentations of the queue with the sql."""
+
+    def __init__(self):
+        self.ctx = context.make_context()
+        self.discovered_backup_map = None
+        self.queue_mapping = dict()
+        self.volume_mapping = dict()
+
+    def get_backups(self, filters=None):
+        return objects.Volume.list(self.ctx, filters=filters)
+
+    def get_queues(self, filters=None):
+        """Get the list of volume queue columns from the queue_data table"""
+        queues = objects.Queue.list(self.ctx, filters=filters)
+        return queues
+
+    def create_queue(self):
+        """Create the queue of all the volumes for backup"""
+        queue_list = self.check_instance_volumes()
+        for queue in queue_list:
+            self._volume_queue(queue)
+
+    # Backup the volumes attached to which has a specific metadata
+    def filter_server(self, metadata):
+
+        if not CONF.conductor.backup_metadata_key in metadata:
+            return False
+
+        return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+
+    # Backup the volumes in in-use and available status
+    def filter_volume(self, volume_id):
+        try:
+            volume = conn.get_volume_by_id(volume_id)
+            if volume == None: return False
+            res = volume['status'] in ("available", "in-use")
+            if not res:
+                LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+            return res
+
+        except exceptions.ResourceNotFound:
+            return False
+
+    def remove_volume_backup(self, backup_object):
+        try:
+            backup = conn.get_volume_backup(backup_object.backup_id)
+            if backup == None: return False
+            if backup["status"] in ("available"):
+                conn.delete_volume_backup(backup_object.backup_id)
+                backup_object.delete_backup()
+            elif backup["status"] in ("error", "error_restoring"):
+                # TODO(Alex): need to discuss
+                #  now if backup is in error status, then retention service
+                #  does not remove it from openstack but removes it from the
+                #  backup table so user can delete it on Horizon.
+                backup_object.delete_backup()
+            else:  # "deleting", "restoring"
+                LOG.info(_("Rotation for the backup %s is skipped in this cycle "
+                           "because it is in %s status") % (backup_object.backup_id, backup["status"]))
+
+        except exceptions.ResourceNotFound:
+            LOG.info(_("Backup %s is not existing in Openstack."
+                       "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+            # remove from the backup table
+            backup_object.delete_backup()
+            return False
+
+    def check_instance_volumes(self):
+        """Get the list of all the volumes from the project using openstacksdk
+        Function first list all the servers in the project and get the volumes
+        that are attached to the instance.
+        """
+        queues_map = []
+        projects = get_projects_list()
+        for project in projects:
+            servers = conn.compute.servers(
+                details=True, all_projects=True, project_id=project.id
+            )
+            for server in servers:
+                if not self.filter_server(server.metadata): continue
+                for volume in server.attached_volumes:
+                    if not self.filter_volume(volume["id"]): continue
+                    queues_map.append(
+                        QueueMapping(
+                            volume_id=volume["id"],
+                            backup_id="NULL",
+                            instance_id=server.id,
+                            backup_status=constants.BACKUP_PLANNED,
+                        )
+                    )
+        return queues_map
+
+    def _volume_queue(self, task):
+        """Saves the queue data to the database."""
+
+        # TODO(Alex): Need to escalate discussion
+        # When create the task list, need to check the WIP backup generators
+        # which are created in the past backup cycle.
+        # Then skip to create new tasks for the volumes whose backup is WIP
+        volume_queue = objects.Queue(self.ctx)
+        volume_queue.backup_id = task.backup_id
+        volume_queue.volume_id = task.volume_id
+        volume_queue.instance_id = task.instance_id
+        volume_queue.backup_status = task.backup_status
+        volume_queue.create()
+
+    def create_volume_backup(self, queue):
+        """Initiate the backup of the volume
+        :params: queue: Provide the map of the volume that needs
+                  backup.
+        This function will call the backupup api and change the
+        backup_status and backup_id in the queue table.
+        """
+        backup_id = queue.backup_id
+        if backup_id == "NULL":
+            try:
+                volume_backup = conn.block_storage.create_backup(
+                    volume_id=queue.volume_id, force=True
+                )
+                queue.backup_id = volume_backup.id
+                queue.backup_status = constants.BACKUP_WIP
+                queue.save()
+            except exceptions as error:
+                print(error)
+        else:
+            pass
+            # TODO(Alex): remove this task from the task list
+            #  Backup planned task cannot have backup_id in the same cycle
+            #  Reserve for now because it is related to the WIP backup genenrators which
+            #  are not finished in the current cycle
+
+    def process_failed_backup(self, task):
+        # 1. TODO(Alex): notify via email
+        LOG.error("Backup of the volume %s failed." % task.volume_id)
+        # 2. TODO(Alex): remove failed backup instance from the openstack
+        #     then set the volume status in-use
+        # 3. remove failed task from the task queue
+        task.delete_queue()
+
+    def process_available_backup(self, task):
+        LOG.info("Backup of the volume %s is successful." % task.volume_id)
+        # 1. save success backup in the backup table
+        self._volume_backup(
+            BackupMapping(
+                volume_id=task.volume_id,
+                backup_id=task.backup_id,
+                instance_id=task.instance_id,
+                backup_completed=1,
+            )
+        )
+        # 2. remove from the task list
+        task.delete_queue()
+        # 3. TODO(Alex): notify via email
+
+    def process_using_backup(self, task):
+        # remove from the task list
+        task.delete_queue()
+
+    def check_volume_backup_status(self, queue):
+        """Checks the backup status of the volume
+        :params: queue: Provide the map of the volume that needs backup
+                 status checked.
+        Call the backups api to see if the backup is successful.
+        """
+        for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+            if backup_gen.id == queue.backup_id:
+                if backup_gen.status == "error":
+                    self.process_failed_backup(queue)
+                elif backup_gen.status == "available":
+                    self.process_available_backup(queue)
+                elif backup_gen.status == "creating":
+                    # TODO(Alex): Need to escalate discussion
+                    # How to proceed WIP bakcup generators?
+                    # To make things worse, the last backup generator is in progress till
+                    # the new backup cycle
+                    LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+                else:  # "deleting", "restoring", "error_restoring" status
+                    self.process_using_backup(queue)
+
+    def _volume_backup(self, task):
+        # matching_backups = [
+        #     g for g in self.available_backups if g.backup_id == task.backup_id
+        # ]
+        # if not matching_backups:
+        volume_backup = objects.Volume(self.ctx)
+        volume_backup.backup_id = task.backup_id
+        volume_backup.volume_id = task.volume_id
+        volume_backup.instance_id = task.instance_id
+        volume_backup.backup_completed = task.backup_completed
+        volume_backup.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 2f3d39b..4493001 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -74,7 +74,7 @@
         )
         if len(queues_to_start) != 0:
             for queue in queues_to_start:
-                backup.Backup().volume_backup_initiate(queue)
+                backup.Backup().create_volume_backup(queue)
 
     # Refresh the task queue
     # TODO(Alex): need to escalate discussion
@@ -126,29 +126,26 @@
     def reload(self):
         LOG.info("%s reload" % self.name)
 
+    def get_backup_list(self):
+        threshold_strtime = self.get_threshold_strtime()
+        if threshold_strtime == None: return False
+        self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
+        return True
+
+    def remove_backups(self):
+        print(self.backup_list)
+        for retention_backup in self.backup_list:
+            # 1. check the backup status and delete only available backups
+            backup.Backup().remove_volume_backup(retention_backup)
+
     @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
     def rotation_engine(self):
         LOG.info("%s rotation_engine" % self.name)
-        # TODO(Alex): No limitation for retention?
-        #  Even though, the backup count is over the limit, the retention service will
-        #  continue working?
-        #     filter based on backup status
-        #     filter based on backup creation time
-        # 2. delete the backups
-
-
         # 1. get the list of backups to remove based on the retention time
-        threshold_strtime = self.get_threshold_strtime()
-        if threshold_strtime == None: return
-        backups = backup.Backup().get_backups(filters={"created_at__lt", threshold_strtime})
+        if not self.get_backup_list(): return
 
         # 2. remove the backups
-        for retention_backup in backups:
-            # 1. check the backup status and delete only available backups
-            if not backup.Backup().check_backup_status(retention_backup): continue
-            # 2. remove backup
-
-            pass
+        self.remove_backups()
 
     # get the threshold time str
     def get_threshold_strtime(self):
@@ -156,13 +153,12 @@
         if time_delta_dict == None: return None
 
         res = xtime.timeago(
-            years= time_delta_dict["years"],
-            months= time_delta_dict["months"],
-            weeks= time_delta_dict["weeks"],
-            days= time_delta_dict["days"],
+            years=time_delta_dict["years"],
+            months=time_delta_dict["months"],
+            weeks=time_delta_dict["weeks"],
+            days=time_delta_dict["days"],
         )
         if res == None: LOG.info(_("Retention time format is invalid. "
                                    "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
 
-        return datetime.datetime.strptime(res, constants.DEFAULT_TIME_FORMAT)
-
+        return res.strftime(constants.DEFAULT_TIME_FORMAT)
diff --git a/staffeln/conf/api.py b/staffeln/conf/api.py
index 846f959..e405d6a 100755
--- a/staffeln/conf/api.py
+++ b/staffeln/conf/api.py
@@ -1,7 +1,6 @@
 from oslo_config import cfg
 from staffeln.i18n import _
 
-
 api_group = cfg.OptGroup(
     "api",
     title="API options",
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index e63895f..4e2aad9 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,7 +1,6 @@
 from oslo_config import cfg
 from staffeln.i18n import _
 
-
 conductor_group = cfg.OptGroup(
     "conductor",
     title="Conductor Options",
@@ -13,7 +12,7 @@
         "backup_workers",
         default=1,
         help=_("The maximum number of backup processes to "
-        "fork and run. Default to number of CPUs on the host."),
+               "fork and run. Default to number of CPUs on the host."),
     ),
     cfg.IntOpt(
         "backup_service_period",
@@ -38,24 +37,25 @@
         "rotation_workers",
         default=1,
         help=_("The maximum number of rotation processes to "
-        "fork and run. Default to number of CPUs on the host."),
+               "fork and run. Default to number of CPUs on the host."),
     ),
     cfg.IntOpt(
         "retention_service_period",
         default=20,
         min=10,
-        help=_("The time of rotation period, the unit is one day."),
+        help=_("The period of the retention service, the unit is one second."),
     ),
     cfg.IntOpt(
         "rotation_workers",
         default=1,
         help=_("The maximum number of rotation processes to "
-        "fork and run. Default to number of CPUs on the host."),
+               "fork and run. Default to number of CPUs on the host."),
     ),
     cfg.StrOpt(
         "retention_time",
-        default="2021-05-05 14:56:00",
-        help=_("The time of rotation period, the unit is one day."),
+        default="2w3d",
+        help=_("The time of retention period, the for mat is "
+               "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
     ),
 ]
 
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 83bb484..6ba5183 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -109,7 +109,7 @@
         if filters is None:
             filters = {}
 
-        plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id"]
+        plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
 
         return self._add_filters(
             query=query,
@@ -144,6 +144,7 @@
 
         for raw_fieldname, value in filters.items():
             fieldname, operator_ = self.__decompose_filter(raw_fieldname)
+
             if fieldname in plain_fields:
                 query = self.__add_simple_filter(
                     query, model, fieldname, value, operator_
@@ -161,7 +162,6 @@
         ):
             if not isinstance(value, datetime.datetime):
                 value = timeutils.parse_isotime(value)
-
         return query.filter(self.valid_operators[operator_](field, value))
 
     def __decompose_filter(self, raw_fieldname):
@@ -306,3 +306,9 @@
             return self._soft_delete(models.Queue_data, id)
         except:
             LOG.error("Queue Not found.")
+
+    def soft_delete_backup(self, id):
+        try:
+            return self._soft_delete(models.Backup_data, id)
+        except:
+            LOG.error("Backup Not found.")
\ No newline at end of file
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 3933bdf..be87b61 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -27,7 +27,7 @@
         """
         db_backups = cls.dbapi.get_backup_list(context, filters=filters)
 
-        return [cls._from_db_object(cls(), obj) for obj in db_backups]
+        return [cls._from_db_object(cls(context), obj) for obj in db_backups]
 
     @base.remotable
     def create(self):
@@ -59,3 +59,8 @@
         """
         current = self.get_by_uuid(backup_id=self.backup_id)
         self.obj_refresh(current)
+
+    @base.remotable
+    def delete_backup(self):
+        """Soft Delete the :class:`Queue_data` from the DB"""
+        db_obj = self.dbapi.soft_delete_backup(self.id)