Merge pull request #12 from vexxhost/update-backup-service

Set timeout for backup cycle to take care of backup results
diff --git a/requirements.txt b/requirements.txt
index 45ae2d5..9c3aad4 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,6 +10,7 @@
 oslo.db>=5.0.0
 oslo.config>=8.1.0
 oslo.log>=4.4.0 # Apache-2.0
+oslo_versionedobjects
 openstacksdk>0.28.0
 pymysql
 
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index d0ec6fe..91b2a95 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -3,3 +3,7 @@
 BACKUP_PLANNED=0
 
 BACKUP_ENABLED_KEY = 'true'
+BACKUP_RESULT_CHECK_INTERVAL = 60 # second
+
+# default config values
+DEFAULT_BACKUP_CYCLE_TIMEOUT="5min"
\ No newline at end of file
diff --git a/staffeln/common/time.py b/staffeln/common/time.py
index 3059085..02e40e1 100644
--- a/staffeln/common/time.py
+++ b/staffeln/common/time.py
@@ -5,7 +5,7 @@
 DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
 
 regex = re.compile(
-    r'((?P<years>\d+?)y)?((?P<months>\d+?)m)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?'
+    r'((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?'
 )
 
 
@@ -31,16 +31,20 @@
     except:
         return None
 
+def get_current_time():
+    return datetime.now()
 
 def get_current_strtime():
     now = datetime.now()
     return now.strftime(DEFAULT_TIME_FORMAT)
 
 
-def timeago(years, months, weeks, days, from_date=None):
+def timeago(years=0, months=0, weeks=0, days=0, hours=0, minutes=0, seconds=0, from_date=None):
     if from_date is None:
         from_date = datetime.now()
-    return from_date - relativedelta(years=years, months=months, weeks=weeks, days=days)
+    return from_date - relativedelta(years=years, months=months,
+                                     weeks=weeks, days=days, hours=hours,
+                                     minutes=minutes, seconds=seconds)
 
 ## yearsago using Standard library
 # def yearsago(years, from_date=None):
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index f358c8a..47a4fc0 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,3 +1,4 @@
+import parse
 import staffeln.conf
 import collections
 from staffeln.common import constants
@@ -14,11 +15,11 @@
 LOG = log.getLogger(__name__)
 
 BackupMapping = collections.namedtuple(
-    "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+    "BackupMapping", ["volume_id", "backup_id", "project_id", "instance_id", "backup_completed"]
 )
 
 QueueMapping = collections.namedtuple(
-    "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+    "QueueMapping", ["volume_id", "backup_id", "project_id", "instance_id", "backup_status"]
 )
 
 conn = auth.create_connection()
@@ -77,6 +78,7 @@
     # Backup the volumes in in-use and available status
     def filter_volume(self, volume_id):
         try:
+            # volume = conn.block_storage.get_volume(volume_id)
             volume = conn.get_volume_by_id(volume_id)
             if volume == None: return False
             res = volume['status'] in ("available", "in-use")
@@ -87,12 +89,47 @@
         except OpenstackResourceNotFound:
             return False
 
-    def remove_volume_backup(self, backup_object):
+    #  delete all backups forcily regardless of the status
+    def hard_cancel_volume_backup(self, task):
         try:
+            LOG.info(_("Cancel backup %s" % task.backup_id))
+            # backup = conn.block_storage.get_backup(
+            #     project_id=task.project_id, backup_id=task.backup_id,
+            # )
+            backup = conn.get_volume_backup(task.backup_id)
+            if backup == None: return task.delete_queue()
+
+            # TODO(Alex): v3 is not supporting force delete?
+            # conn.block_storage.delete_backup(
+            #     project_id=task.project_id, backup_id=task.backup_id,
+            # )
+            conn.delete_volume_backup(task.backup_id, force=True)
+            # TODO(Alex): After delete the backup generator, need to set the volume status again
+            task.delete_queue()
+
+        except OpenstackResourceNotFound:
+            task.delete_queue()
+
+        except OpenstackSDKException as e:
+            LOG.info(_("Backup %s deletion failed."
+                       "%s" % (task.backup_id, str(e))))
+            # TODO(Alex): When backup timeout and cancel failed
+            # 1. notify
+            # 2. set the volume status as in-use
+            # remove from the queue table
+            task.delete_queue()
+
+    #  delete only available backups
+    def soft_remove_volume_backup(self, backup_object):
+        try:
+
+            # backup = conn.block_storage.get_backup(
+            #     project_id=backup_object.project_id, backup_id=backup_object.backup_id,
+            # )
             backup = conn.get_volume_backup(backup_object.backup_id)
-            if backup == None: return False
+            if backup == None: return backup_object.delete_backup()
             if backup["status"] in ("available"):
-                conn.delete_volume_backup(backup_object.backup_id)
+                conn.delete_volume_backup(backup_object.backup_id, force=True)
                 backup_object.delete_backup()
             elif backup["status"] in ("error", "error_restoring"):
                 # TODO(Alex): need to discuss
@@ -111,6 +148,40 @@
             backup_object.delete_backup()
             return False
 
+        except OpenstackSDKException as e:
+            LOG.info(_("Backup %s deletion failed."
+                       "%s" % (backup_object.backup_id, str(e))))
+            # TODO(Alex): Add it into the notification queue
+            # remove from the backup table
+            backup_object.delete_backup()
+            return False
+
+    #  delete all backups forcily regardless of the status
+    def hard_remove_volume_backup(self, backup_object):
+        try:
+
+            # backup = conn.block_storage.get_backup(
+            #     project_id=backup_object.project_id, backup_id=backup_object.backup_id,
+            # )
+            backup = conn.get_volume_backup(backup_object.backup_id)
+            if backup == None: return backup_object.delete_backup()
+
+            conn.delete_volume_backup(backup_object.backup_id, force=True)
+            backup_object.delete_backup()
+
+        except OpenstackResourceNotFound:
+            LOG.info(_("Backup %s is not existing in Openstack."
+                       "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+            # remove from the backup table
+            backup_object.delete_backup()
+
+        except OpenstackSDKException as e:
+            LOG.info(_("Backup %s deletion failed."
+                       "%s" % (backup_object.backup_id, str(e))))
+            # TODO(Alex): Add it into the notification queue
+            # remove from the backup table
+            backup_object.delete_backup()
+
     def check_instance_volumes(self):
         """Get the list of all the volumes from the project using openstacksdk
         Function first list all the servers in the project and get the volumes
@@ -128,6 +199,7 @@
                     if not self.filter_volume(volume["id"]): continue
                     queues_map.append(
                         QueueMapping(
+                            project_id=project.id,
                             volume_id=volume["id"],
                             backup_id="NULL",
                             instance_id=server.id,
@@ -160,8 +232,13 @@
         backup_id = queue.backup_id
         if backup_id == "NULL":
             try:
+                LOG.info(_("Backup for volume %s creating" % queue.volume_id))
+                # volume_backup = conn.block_storage.create_backup(
+                #     volume_id=queue.volume_id, force=True, project_id=queue.project_id,
+                # )
+                # NOTE(Alex): no need to wait because we have a cycle time out
                 volume_backup = conn.create_volume_backup(
-                    volume_id=queue.volume_id, force=True
+                    volume_id=queue.volume_id, force=True, wait=False,
                 )
                 queue.backup_id = volume_backup.id
                 queue.backup_status = constants.BACKUP_WIP
@@ -169,6 +246,11 @@
             except OpenstackSDKException as error:
                 LOG.info(_("Backup creation for the volume %s failled. %s"
                            % (queue.volume_id, str(error))))
+                parsed = parse.parse("Error in creating volume backup {id}", str(error))
+                if parsed == None: return
+                queue.backup_id = parsed["id"]
+                queue.backup_status = constants.BACKUP_WIP
+                queue.save()
         else:
             pass
             # TODO(Alex): remove this task from the task list
@@ -181,15 +263,20 @@
         LOG.error("Backup of the volume %s failed." % task.volume_id)
         # 2. TODO(Alex): remove failed backup instance from the openstack
         #     then set the volume status in-use
+        self.hard_cancel_volume_backup(task)
         # 3. remove failed task from the task queue
         task.delete_queue()
 
+    def process_non_existing_backup(self, task):
+        task.delete_queue()
+
     def process_available_backup(self, task):
         LOG.info("Backup of the volume %s is successful." % task.volume_id)
         # 1. save success backup in the backup table
         self._volume_backup(
             BackupMapping(
                 volume_id=task.volume_id,
+                project_id=task.project_id,
                 backup_id=task.backup_id,
                 instance_id=task.instance_id,
                 backup_completed=1,
@@ -211,10 +298,13 @@
         """
         # for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
         try:
+            # backup_gen = conn.block_storage.get_backup(
+            #     project_id=queue.project_id, backup_id=queue.backup_id,
+            # )
             backup_gen = conn.get_volume_backup(queue.backup_id)
             if backup_gen == None:
                 # TODO(Alex): need to check when it is none
-                LOG.info(_("Backup status of %s is returning none."%(queue.backup_id)))
+                LOG.info(_("[Beta] Backup status of %s is returning none." % (queue.backup_id)))
                 return
             if backup_gen.status == "error":
                 self.process_failed_backup(queue)
@@ -229,7 +319,7 @@
             else:  # "deleting", "restoring", "error_restoring" status
                 self.process_using_backup(queue)
         except OpenstackResourceNotFound as e:
-            self.process_failed_backup(queue)
+            self.process_non_existing_backup(queue)
 
     def _volume_backup(self, task):
         # matching_backups = [
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 6898348..bfda77d 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,5 +1,4 @@
 import cotyledon

-import datetime

 from futurist import periodics

 from oslo_log import log

 import staffeln.conf

@@ -59,16 +58,51 @@
         return False

 

     # Manage active backup generators

-    # TODO(Alex): need to discuss

-    #  Need to wait until all backups are finished?

-    #  That is required to make the backup report

     def _process_wip_tasks(self):

         LOG.info(_("Processing WIP backup generators..."))

-        queues_started = backup.Backup().get_queues(

-            filters={"backup_status": constants.BACKUP_WIP}

+        # TODO(Alex): Replace this infinite loop with finite time

+        self.cycle_start_time = xtime.get_current_time()

+

+        # loop - take care of backup result while timeout

+        while(1):

+            queues_started = backup.Backup().get_queues(

+                filters={"backup_status": constants.BACKUP_WIP}

+            )

+            if len(queues_started) == 0:

+                LOG.info(_("task queue empty"))

+                break

+            if not self._backup_cycle_timeout():# time in

+                LOG.info(_("cycle timein"))

+                for queue in queues_started: backup.Backup().check_volume_backup_status(queue)

+            else: # time out

+                LOG.info(_("cycle timeout"))

+                for queue in queues_started: backup.Backup().hard_cancel_volume_backup(queue)

+                break

+            time.sleep(constants.BACKUP_RESULT_CHECK_INTERVAL)

+

+    # if the backup cycle timeout, then return True

+    def _backup_cycle_timeout(self):

+        time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.backup_cycle_timout)

+

+        if time_delta_dict == None:

+            LOG.info(_("Recycle timeout format is invalid. "

+                       "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."))

+            time_delta_dict = xtime.parse_timedelta_string(constants.DEFAULT_BACKUP_CYCLE_TIMEOUT)

+        rto = xtime.timeago(

+            years=time_delta_dict["years"],

+            months=time_delta_dict["months"],

+            weeks=time_delta_dict["weeks"],

+            days=time_delta_dict["days"],

+            hours=time_delta_dict["hours"],

+            minutes=time_delta_dict["minutes"],

+            seconds=time_delta_dict["seconds"],

         )

-        if len(queues_started) != 0:

-            for queue in queues_started: backup.Backup().check_volume_backup_status(queue)

+        # print(rto.strftime(xtime.DEFAULT_TIME_FORMAT))

+        # print(self.cycle_start_time)

+        # print(self.cycle_start_time - rto)

+        if rto >= self.cycle_start_time:

+            return True

+        return False

 

     # Create backup generators

     def _process_todo_tasks(self):

@@ -92,7 +126,6 @@
         self.failed_backup_list = []

         notify.SendBackupResultEmail(self.success_backup_list, self.failed_backup_list)

 

-

     @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)

     def backup_engine(self):

         LOG.info("backing... %s" % str(time.time()))

@@ -101,12 +134,10 @@
         if self._check_quota(): return

         # NOTE(Alex): If _process_wip_tasks() waits tiil no WIP tasks

         # exist, no need to repeat this function before and after queue update.

-        self._process_wip_tasks()

         self._update_task_queue()

         self._process_todo_tasks()

         self._process_wip_tasks()

-        self._report_backup_result()

-

+        # self._report_backup_result()

 

 

 class RotationManager(cotyledon.Service):

@@ -145,8 +176,7 @@
     def remove_backups(self):

         print(self.backup_list)

         for retention_backup in self.backup_list:

-            # 1. check the backup status and delete only available backups

-            backup.Backup().remove_volume_backup(retention_backup)

+            backup.Backup().hard_remove_volume_backup(retention_backup)

 

     @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)

     def rotation_engine(self):

@@ -160,15 +190,18 @@
     # get the threshold time str

     def get_threshold_strtime(self):

         time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)

-        if time_delta_dict == None: return None

+        if time_delta_dict == None:

+            LOG.info(_("Retention time format is invalid. "

+                       "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."))

+            return None

 

         res = xtime.timeago(

             years=time_delta_dict["years"],

             months=time_delta_dict["months"],

             weeks=time_delta_dict["weeks"],

             days=time_delta_dict["days"],

+            hours=time_delta_dict["hours"],

+            minutes=time_delta_dict["minutes"],

+            seconds=time_delta_dict["seconds"],

         )

-        if res == None: LOG.info(_("Retention time format is invalid. "

-                                   "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))

-

         return res.strftime(xtime.DEFAULT_TIME_FORMAT)

diff --git a/staffeln/conductor/notify.py b/staffeln/conductor/notify.py
index d80da4e..2f77aa9 100644
--- a/staffeln/conductor/notify.py
+++ b/staffeln/conductor/notify.py
@@ -54,4 +54,3 @@
         LOG.info(_("Backup result email sent"))

     except Exception as e:

         LOG.error(_("Backup result email send failed. Please check email configuration. %s" % (str(e))))

-

diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 4e2aad9..7c9c643 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,4 +1,5 @@
 from oslo_config import cfg
+from staffeln.common import constants
 from staffeln.i18n import _
 
 conductor_group = cfg.OptGroup(
@@ -16,11 +17,18 @@
     ),
     cfg.IntOpt(
         "backup_service_period",
-        default=60,
+        default=30,
         min=10,
         help=_("The time of bakup period, the unit is one minute."),
     ),
     cfg.StrOpt(
+        "backup_cycle_timout",
+        regex=r'((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?',
+        default=constants.DEFAULT_BACKUP_CYCLE_TIMEOUT,
+        help=_("The duration while the backup cycle waits backups."
+               "<YEARS>y<MONTHS>mon<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."),
+    ),
+    cfg.StrOpt(
         "backup_metadata_key",
         default="__automated_backup",
         help=_("The key string of metadata the VM, which requres back up, has"),
@@ -53,9 +61,10 @@
     ),
     cfg.StrOpt(
         "retention_time",
+        regex=r'((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?',
         default="2w3d",
         help=_("The time of retention period, the for mat is "
-               "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
+               "<YEARS>y<MONTHS>mon<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."),
     ),
 ]
 
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 4b4fc9f..0148c3d 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -110,7 +110,14 @@
             filters = {}
 
 
-        plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
+        plain_fields = [
+            "volume_id",
+            "backup_id",
+            "project_id",
+            "backup_completed",
+            "instance_id",
+            "created_at"
+        ]
 
         return self._add_filters(
             query=query,
@@ -126,6 +133,7 @@
 
         plain_fields = [
             "backup_id",
+            "project_id",
             "volume_id",
             "instance_id",
             "backup_status",
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 715d747..93dae9f 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -62,6 +62,7 @@
     )
     id = Column(Integer, primary_key=True, autoincrement=True)
     backup_id = Column(String(100))
+    project_id = Column(String(100))
     volume_id = Column(String(100))
     instance_id = Column(String(100))
     backup_completed = Column(Integer())
@@ -74,6 +75,7 @@
     __table_args__ = table_args()
     id = Column(Integer, primary_key=True, autoincrement=True)
     backup_id = Column(String(100))
+    project_id = Column(String(100))
     volume_id = Column(String(100))
     backup_status = Column(Integer())
     instance_id = Column(String(100))
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index ac8c9e0..0137d4e 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -15,6 +15,7 @@
     fields = {
         "id": sfeild.IntegerField(),
         "backup_id": sfeild.StringField(),
+        "project_id": sfeild.UUIDField(),
         "volume_id": sfeild.UUIDField(),
         "instance_id": sfeild.StringField(),
         "backup_status": sfeild.IntegerField(),
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 6655b7b..ce300af 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -15,6 +15,7 @@
         "id": sfeild.IntegerField(),
         "backup_id": sfeild.StringField(),
         "instance_id": sfeild.StringField(),
+        "project_id": sfeild.UUIDField(),
         "volume_id": sfeild.UUIDField(),
         "backup_completed": sfeild.IntegerField(),
     }