Add retention for instance

Add new metadata for instance that allow provide time format for
backup retention.

Backup will be clean base on the value of retention metadata key in
instance metadata. If not set, we will fallback to use `retention_time`
from config.
diff --git a/etc/staffeln/staffeln.conf b/etc/staffeln/staffeln.conf
index 19a214a..13cfacf 100644
--- a/etc/staffeln/staffeln.conf
+++ b/etc/staffeln/staffeln.conf
@@ -7,6 +7,7 @@
 backup_cycle_timout = 5min
 retention_time = 2w3d
 backup_metadata_key="__automated_backup"
+retention_metadata_key="__staffeln_retention"
 
 [database]
 backend = sqlalchemy
@@ -30,4 +31,4 @@
 [notification]
 # receiver = reciever@gmail.com
 # sender_email = sender@vexxhost.com
-# smtp_server_domain = localhost
\ No newline at end of file
+# smtp_server_domain = localhost
diff --git a/staffeln/common/openstack.py b/staffeln/common/openstack.py
index 71bafc9..b6329a2 100644
--- a/staffeln/common/openstack.py
+++ b/staffeln/common/openstack.py
@@ -38,10 +38,13 @@
     def get_projects(self):

         return self.conn.list_projects()

 

-    def get_servers(self, project_id, all_projects=True, details=True):

-        return self.conn.compute.servers(

-            details=details, all_projects=all_projects, project_id=project_id

-        )

+    def get_servers(self, project_id=None, all_projects=True, details=True):

+        if project_id is not None:

+            return self.conn.compute.servers(

+                details=details, all_projects=all_projects, project_id=project_id

+            )

+        else:

+            return self.conn.compute.servers(details=details, all_projects=all_projects)

 

     def get_volume(self, uuid, project_id):

         return self.conn.get_volume_by_id(uuid)

diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 930ae0b..21ef724 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -23,6 +23,7 @@
         "instance_id",
         "backup_completed",
         "incremental",
+        "created_at",
     ],
 )
 
@@ -363,6 +364,27 @@
                     )
         return queues_map
 
+    def collect_instance_retention_map(self):
+        """Retrieves instance backup retention map"""
+
+        retention_map = {}
+        # No customized retention.
+        if not CONF.conductor.retention_metadata_key:
+            return retention_map
+        self.refresh_openstacksdk()
+
+        try:
+            servers = self.openstacksdk.get_servers(all_projects=True)
+        except OpenstackHttpException as ex:
+            LOG.warn(_("Failed to list servers for all projects."))
+
+        for server in servers:
+            if CONF.conductor.retention_metadata_key in server.metadata:
+                retention_map[server.id] = server.metadata[
+                    CONF.conductor.retention_metadata_key
+                ].lower()
+        return retention_map
+
     def _volume_queue(self, task):
         """
         Commits one backup task to queue table
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index bbd5bcb..ed324a1 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,3 +1,4 @@
+from datetime import datetime, timezone

 import threading

 import time

 

@@ -164,6 +165,7 @@
         super(RotationManager, self).__init__(worker_id)

         self._shutdown = threading.Event()

         self.conf = conf

+        self.lock_mgt = lock.LockManager()

         self.controller = backup.Backup()

         LOG.info("%s init" % self.name)

 

@@ -178,18 +180,12 @@
     def reload(self):

         LOG.info("%s reload" % self.name)

 

-    def get_backup_list(self):

-        threshold_strtime = self.get_threshold_strtime()

-        if threshold_strtime is None:

-            return False

-        self.backup_list = self.controller.get_backups(

-            filters={"created_at__lt": threshold_strtime}

-        )

-        return True

+    def get_backup_list(self, filters=None):

+        return self.controller.get_backups(filters=filters)

 

-    def remove_backups(self):

-        print(self.backup_list)

-        for retention_backup in self.backup_list:

+    def remove_backups(self, retention_backups):

+        LOG.info(_("Backups to be removed: %s" % retention_backups))

+        for retention_backup in retention_backups:

             self.controller.hard_remove_volume_backup(retention_backup)

 

     def rotation_engine(self, retention_service_period):

@@ -197,14 +193,46 @@
 

         @periodics.periodic(spacing=retention_service_period, run_immediately=True)

         def rotation_tasks():

-            self.controller.refresh_openstacksdk()

-            # 1. get the list of backups to remove based on the retention time

-            if not self.get_backup_list():

-                return

-            # 2. get project list

-            self.controller.update_project_list()

-            # 3. remove the backups

-            self.remove_backups()

+            try:

+                # TODO(rlin): change to use decorator for this

+                # Make sure only one retention at a time

+                with self.lock_mgt.coordinator.get_lock("retention"):

+                    self.controller.refresh_openstacksdk()

+                    # get the threshold time

+                    threshold_strtime = self.get_time_from_str(CONF.conductor.retention_time)

+                    instance_retention_map = self.controller.collect_instance_retention_map()

+                    retention_backups = []

+                    # 1. get init list of backups to remove based on the retention time

+                    if not instance_retention_map:

+                        retention_backups = self.get_backup_list(

+                            filters={

+                                "created_at__lt": threshold_strtime.strftime(

+                                    xtime.DEFAULT_TIME_FORMAT

+                                )

+                            }

+                        )

+                    else:

+                        full_backup_list = self.get_backup_list()

+                        for backup in full_backup_list:

+                            if backup.instance_id in instance_retention_map:

+                                retention_time = self.get_time_from_str(

+                                    instance_retention_map[backup.instance_id]

+                                )

+                                backup_age = datetime.now(timezone.utc) - backup.created_at

+                                if backup_age > retention_time:

+                                    # Backup remain longer than retention, need to purge it.

+                                    retention_backups.append(backup)

+                            elif threshold_strtime < backup.created_at:

+                                retention_backups.append(backup)

+

+                    if not retention_backups:

+                        return

+                    # 2. get project list

+                    self.controller.update_project_list()

+                    # 3. remove the backups

+                    self.remove_backups(retention_backups)

+            except coordination.LockAcquireFailed:

+                LOG.debug("Failed to lock for retention")

 

         periodic_callables = [

             (rotation_tasks, (), {}),

@@ -216,9 +244,9 @@
         periodic_thread.daemon = True

         periodic_thread.start()

 

-    # get the threshold time str

-    def get_threshold_strtime(self):

-        time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)

+    # get time

+    def get_time_from_str(self, time_str, to_str=False):

+        time_delta_dict = xtime.parse_timedelta_string(time_str)

         if time_delta_dict is None:

             LOG.info(

                 _(

@@ -237,4 +265,4 @@
             minutes=time_delta_dict["minutes"],

             seconds=time_delta_dict["seconds"],

         )

-        return res.strftime(xtime.DEFAULT_TIME_FORMAT)

+        return res.strftime(xtime.DEFAULT_TIME_FORMAT) if to_str else res

diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 6e5811d..5ab8e4b 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -39,6 +39,14 @@
         "backup_metadata_key",
         help=_("The key string of metadata the VM, which requres back up, has"),
     ),
+    cfg.StrOpt(
+        "retention_metadata_key",
+        regex=(
+            r"((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?"
+            r"((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?"
+        ),
+        help=_("The key string of metadata the VM, which use as backup retention period."),
+    ),
     cfg.IntOpt(
         "full_backup_depth",
         default=2,
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 1f0bdb6..6b0e676 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -1,3 +1,4 @@
+from oslo_versionedobjects import fields as ovoo_fields
 from staffeln.db import api as db_api
 from staffeln.objects import base
 from staffeln.objects import fields as sfeild
@@ -19,6 +20,7 @@
         "volume_id": sfeild.UUIDField(),
         "backup_completed": sfeild.IntegerField(),
         "incremental": sfeild.BooleanField(),
+        "created_at": ovoo_fields.DateTimeField(),
     }
 
     @base.remotable_classmethod