Merge pull request #10 from vexxhost/add-retention-service
[WIP]: Add retention service
diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index d3d0412..26cd159 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -7,7 +7,6 @@
from staffeln.conductor import manager
import staffeln.conf
-
CONF = staffeln.conf.CONF
@@ -15,7 +14,8 @@
service.prepare_service()
sm = cotyledon.ServiceManager()
- sm.add(manager.BackupManager, workers=CONF.conductor.backup_workers, args=(CONF,))
+ sm.add(manager.BackupManager,
+ workers=CONF.conductor.backup_workers, args=(CONF,))
# sm.add(manager.RotationManager,
# workers=CONF.conductor.rotation_workers, args=(CONF,))
oslo_config_glue.setup(sm, CONF)
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index d537d48..6de73f6 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -2,4 +2,6 @@
BACKUP_WIP=1
BACKUP_PLANNED=0
-BACKUP_ENABLED_KEY = 'true'
\ No newline at end of file
+BACKUP_ENABLED_KEY = 'true'
+
+DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
\ No newline at end of file
diff --git a/staffeln/common/time.py b/staffeln/common/time.py
new file mode 100644
index 0000000..6ebdee7
--- /dev/null
+++ b/staffeln/common/time.py
@@ -0,0 +1,48 @@
+import re
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+
+regex = re.compile(
+ r'((?P<years>\d+?)y)?((?P<months>\d+?)m)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?'
+)
+
+
+# parse_time parses timedelta string to time dict
+# input: <string> 1y2m3w5d - all values should be integer
+# output: <dict> {year: 1, month: 2, week: 3, day: 5}
+def parse_timedelta_string(time_str):
+ empty_flag = True
+ try:
+ parts = regex.match(time_str)
+ if not parts:
+ return None
+ parts = parts.groupdict()
+ time_params = {}
+ for key in parts:
+ if parts[key]:
+ time_params[key] = int(parts[key])
+ empty_flag = False
+ else:
+ time_params[key] = 0
+ if empty_flag: return None
+ return time_params
+ except:
+ return None
+
+
+def timeago(years, months, weeks, days, from_date=None):
+ if from_date is None:
+ from_date = datetime.now()
+ return from_date - relativedelta(years=years, months=months, weeks=weeks, days=days)
+
+## yearsago using Standard library
+# def yearsago(years, from_date=None):
+# if from_date is None:
+# from_date = datetime.now()
+# try:
+# return from_date.replace(year=from_date.year - years)
+# except ValueError:
+# # Must be 2/29!
+# assert from_date.month == 2 and from_date.day == 29 # can be removed
+# return from_date.replace(month=2, day=28,
+# year=from_date.year-years)
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index f9cde7b..ada7faa 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,196 +1,227 @@
-import staffeln.conf
-import collections
-from staffeln.common import constants
-
-from openstack import exceptions
-from oslo_log import log
-from staffeln.common import auth
-from staffeln.common import context
-from staffeln import objects
-from staffeln.i18n import _
-
-CONF = staffeln.conf.CONF
-LOG = log.getLogger(__name__)
-
-BackupMapping = collections.namedtuple(
- "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
-)
-
-QueueMapping = collections.namedtuple(
- "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
-)
-
-conn = auth.create_connection()
-
-
-def check_vm_backup_metadata(metadata):
- if not CONF.conductor.backup_metadata_key in metadata:
- return False
- return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
-
-def get_projects_list():
- projects = conn.list_projects()
- return projects
-
-
-class Backup(object):
- """Implmentations of the queue with the sql."""
-
- def __init__(self):
- self.ctx = context.make_context()
- self.discovered_backup_map = None
- self.queue_mapping = dict()
- self.volume_mapping = dict()
-
- def get_backups(self):
- return objects.Volume.list(self.ctx)
-
- def get_queues(self, filters=None):
- """Get the list of volume queue columns from the queue_data table"""
- queues = objects.Queue.list(self.ctx, filters=filters)
- return queues
-
- def create_queue(self):
- """Create the queue of all the volumes for backup"""
- queue_list = self.check_instance_volumes()
- for queue in queue_list:
- self._volume_queue(queue)
-
- # Backup the volumes attached to which has a specific metadata
- def filter_server(self, metadata):
-
- if not CONF.conductor.backup_metadata_key in metadata:
- return False
-
- return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
-
- # Backup the volumes in in-use and available status
- def filter_volume(self, volume_id):
- volume = conn.get_volume_by_id(volume_id)
- res = volume['status'] in ("available", "in-use")
- if not res:
- LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
-
- def check_instance_volumes(self):
- """Get the list of all the volumes from the project using openstacksdk
- Function first list all the servers in the project and get the volumes
- that are attached to the instance.
- """
- queues_map = []
- projects = get_projects_list()
- for project in projects:
- servers = conn.compute.servers(
- details=True, all_projects=True, project_id=project.id
- )
- for server in servers:
- if not self.filter_server(server.metadata): continue
- for volume in server.attached_volumes:
- if not self.filter_volume(volume["id"]): continue
- queues_map.append(
- QueueMapping(
- volume_id=volume["id"],
- backup_id="NULL",
- instance_id=server.id,
- backup_status=constants.BACKUP_PLANNED,
- )
- )
- return queues_map
-
- def _volume_queue(self, task):
- """Saves the queue data to the database."""
-
- # TODO(Alex): Need to escalate discussion
- # When create the task list, need to check the WIP backup generators
- # which are created in the past backup cycle.
- # Then skip to create new tasks for the volumes whose backup is WIP
- volume_queue = objects.Queue(self.ctx)
- volume_queue.backup_id = task.backup_id
- volume_queue.volume_id = task.volume_id
- volume_queue.instance_id = task.instance_id
- volume_queue.backup_status = task.backup_status
- volume_queue.create()
-
- def volume_backup_initiate(self, queue):
- """Initiate the backup of the volume
- :params: queue: Provide the map of the volume that needs
- backup.
- This function will call the backupup api and change the
- backup_status and backup_id in the queue table.
- """
- backup_id = queue.backup_id
- if backup_id == "NULL":
- try:
- volume_backup = conn.block_storage.create_backup(
- volume_id=queue.volume_id, force=True
- )
- queue.backup_id = volume_backup.id
- queue.backup_status = constants.BACKUP_WIP
- queue.save()
- except exceptions as error:
- print(error)
- else:
- pass
- # TODO(Alex): remove this task from the task list
- # Backup planned task cannot have backup_id in the same cycle
- # Reserve for now because it is related to the WIP backup genenrators which
- # are not finished in the current cycle
-
- def process_failed_backup(self, task):
- # 1. TODO(Alex): notify via email
- LOG.error("Backup of the volume %s failed." % task.volume_id)
- # 2. TODO(Alex): remove failed backup instance from the openstack
- # then set the volume status in-use
- # 3. remove failed task from the task queue
- task.delete_queue()
-
- def process_available_backup(self, task):
- LOG.info("Backup of the volume %s is successful." % task.volume_id)
- # 1. save success backup in the backup table
- self._volume_backup(
- BackupMapping(
- volume_id=task.volume_id,
- backup_id=task.backup_id,
- instance_id=task.instance_id,
- backup_completed=1,
- )
- )
- # 2. remove from the task list
- task.delete_queue()
- # 3. TODO(Alex): notify via email
-
- def process_using_backup(self, task):
- # remove from the task list
- task.delete_queue()
-
- def check_volume_backup_status(self, queue):
- """Checks the backup status of the volume
- :params: queue: Provide the map of the volume that needs backup
- status checked.
- Call the backups api to see if the backup is successful.
- """
- for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
- if backup_gen.id == queue.backup_id:
- if backup_gen.status == "error":
- self.process_failed_backup(queue)
- elif backup_gen.status == "available":
- self.process_available_backup(queue)
- elif backup_gen.status == "creating":
- # TODO(Alex): Need to escalate discussion
- # How to proceed WIP bakcup generators?
- # To make things worse, the last backup generator is in progress till
- # the new backup cycle
- LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- else: # "deleting", "restoring", "error_restoring" status
- self.process_using_backup(queue)
-
- def _volume_backup(self, task):
- # matching_backups = [
- # g for g in self.available_backups if g.backup_id == task.backup_id
- # ]
- # if not matching_backups:
- volume_backup = objects.Volume(self.ctx)
- volume_backup.backup_id = task.backup_id
- volume_backup.volume_id = task.volume_id
- volume_backup.instance_id = task.instance_id
- volume_backup.backup_completed = task.backup_completed
- volume_backup.create()
+import staffeln.conf
+import collections
+from staffeln.common import constants
+
+from openstack import exceptions
+from oslo_log import log
+from staffeln.common import auth
+from staffeln.common import context
+from staffeln import objects
+from staffeln.i18n import _
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+BackupMapping = collections.namedtuple(
+ "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+)
+
+QueueMapping = collections.namedtuple(
+ "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+)
+
+conn = auth.create_connection()
+
+
+def check_vm_backup_metadata(metadata):
+ if not CONF.conductor.backup_metadata_key in metadata:
+ return False
+ return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+
+
+def get_projects_list():
+ projects = conn.list_projects()
+ return projects
+
+
+class Backup(object):
+ """Implmentations of the queue with the sql."""
+
+ def __init__(self):
+ self.ctx = context.make_context()
+ self.discovered_backup_map = None
+ self.queue_mapping = dict()
+ self.volume_mapping = dict()
+
+ def get_backups(self, filters=None):
+ return objects.Volume.list(self.ctx, filters=filters)
+
+ def get_queues(self, filters=None):
+ """Get the list of volume queue columns from the queue_data table"""
+ queues = objects.Queue.list(self.ctx, filters=filters)
+ return queues
+
+ def create_queue(self):
+ """Create the queue of all the volumes for backup"""
+ queue_list = self.check_instance_volumes()
+ for queue in queue_list:
+ self._volume_queue(queue)
+
+ # Backup the volumes attached to which has a specific metadata
+ def filter_server(self, metadata):
+
+ if not CONF.conductor.backup_metadata_key in metadata:
+ return False
+
+ return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+
+ # Backup the volumes in in-use and available status
+ def filter_volume(self, volume_id):
+ try:
+ volume = conn.get_volume_by_id(volume_id)
+ if volume == None: return False
+ res = volume['status'] in ("available", "in-use")
+ if not res:
+ LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+ return res
+
+ except exceptions.ResourceNotFound:
+ return False
+
+ def remove_volume_backup(self, backup_object):
+ try:
+ backup = conn.get_volume_backup(backup_object.backup_id)
+ if backup == None: return False
+ if backup["status"] in ("available"):
+ conn.delete_volume_backup(backup_object.backup_id)
+ backup_object.delete_backup()
+ elif backup["status"] in ("error", "error_restoring"):
+ # TODO(Alex): need to discuss
+ # now if backup is in error status, then retention service
+ # does not remove it from openstack but removes it from the
+ # backup table so user can delete it on Horizon.
+ backup_object.delete_backup()
+ else: # "deleting", "restoring"
+ LOG.info(_("Rotation for the backup %s is skipped in this cycle "
+ "because it is in %s status") % (backup_object.backup_id, backup["status"]))
+
+ except exceptions.ResourceNotFound:
+ LOG.info(_("Backup %s is not existing in Openstack."
+ "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+ # remove from the backup table
+ backup_object.delete_backup()
+ return False
+
+ def check_instance_volumes(self):
+ """Get the list of all the volumes from the project using openstacksdk
+ Function first list all the servers in the project and get the volumes
+ that are attached to the instance.
+ """
+ queues_map = []
+ projects = get_projects_list()
+ for project in projects:
+ servers = conn.compute.servers(
+ details=True, all_projects=True, project_id=project.id
+ )
+ for server in servers:
+ if not self.filter_server(server.metadata): continue
+ for volume in server.attached_volumes:
+ if not self.filter_volume(volume["id"]): continue
+ queues_map.append(
+ QueueMapping(
+ volume_id=volume["id"],
+ backup_id="NULL",
+ instance_id=server.id,
+ backup_status=constants.BACKUP_PLANNED,
+ )
+ )
+ return queues_map
+
+ def _volume_queue(self, task):
+ """Saves the queue data to the database."""
+
+ # TODO(Alex): Need to escalate discussion
+ # When create the task list, need to check the WIP backup generators
+ # which are created in the past backup cycle.
+ # Then skip to create new tasks for the volumes whose backup is WIP
+ volume_queue = objects.Queue(self.ctx)
+ volume_queue.backup_id = task.backup_id
+ volume_queue.volume_id = task.volume_id
+ volume_queue.instance_id = task.instance_id
+ volume_queue.backup_status = task.backup_status
+ volume_queue.create()
+
+ def create_volume_backup(self, queue):
+ """Initiate the backup of the volume
+ :params: queue: Provide the map of the volume that needs
+ backup.
+ This function will call the backupup api and change the
+ backup_status and backup_id in the queue table.
+ """
+ backup_id = queue.backup_id
+ if backup_id == "NULL":
+ try:
+ volume_backup = conn.block_storage.create_backup(
+ volume_id=queue.volume_id, force=True
+ )
+ queue.backup_id = volume_backup.id
+ queue.backup_status = constants.BACKUP_WIP
+ queue.save()
+ except exceptions as error:
+ print(error)
+ else:
+ pass
+ # TODO(Alex): remove this task from the task list
+ # Backup planned task cannot have backup_id in the same cycle
+ # Reserve for now because it is related to the WIP backup genenrators which
+ # are not finished in the current cycle
+
+ def process_failed_backup(self, task):
+ # 1. TODO(Alex): notify via email
+ LOG.error("Backup of the volume %s failed." % task.volume_id)
+ # 2. TODO(Alex): remove failed backup instance from the openstack
+ # then set the volume status in-use
+ # 3. remove failed task from the task queue
+ task.delete_queue()
+
+ def process_available_backup(self, task):
+ LOG.info("Backup of the volume %s is successful." % task.volume_id)
+ # 1. save success backup in the backup table
+ self._volume_backup(
+ BackupMapping(
+ volume_id=task.volume_id,
+ backup_id=task.backup_id,
+ instance_id=task.instance_id,
+ backup_completed=1,
+ )
+ )
+ # 2. remove from the task list
+ task.delete_queue()
+ # 3. TODO(Alex): notify via email
+
+ def process_using_backup(self, task):
+ # remove from the task list
+ task.delete_queue()
+
+ def check_volume_backup_status(self, queue):
+ """Checks the backup status of the volume
+ :params: queue: Provide the map of the volume that needs backup
+ status checked.
+ Call the backups api to see if the backup is successful.
+ """
+ for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+ if backup_gen.id == queue.backup_id:
+ if backup_gen.status == "error":
+ self.process_failed_backup(queue)
+ elif backup_gen.status == "available":
+ self.process_available_backup(queue)
+ elif backup_gen.status == "creating":
+ # TODO(Alex): Need to escalate discussion
+ # How to proceed WIP bakcup generators?
+ # To make things worse, the last backup generator is in progress till
+ # the new backup cycle
+ LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+ else: # "deleting", "restoring", "error_restoring" status
+ self.process_using_backup(queue)
+
+ def _volume_backup(self, task):
+ # matching_backups = [
+ # g for g in self.available_backups if g.backup_id == task.backup_id
+ # ]
+ # if not matching_backups:
+ volume_backup = objects.Volume(self.ctx)
+ volume_backup.backup_id = task.backup_id
+ volume_backup.volume_id = task.volume_id
+ volume_backup.instance_id = task.instance_id
+ volume_backup.backup_completed = task.backup_completed
+ volume_backup.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 47a3ec3..4493001 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,4 +1,5 @@
import cotyledon
+import datetime
from futurist import periodics
from oslo_log import log
import staffeln.conf
@@ -8,6 +9,7 @@
from staffeln.common import constants
from staffeln.conductor import backup
from staffeln.common import context
+from staffeln.common import time as xtime
from staffeln.i18n import _
LOG = log.getLogger(__name__)
@@ -72,7 +74,7 @@
)
if len(queues_to_start) != 0:
for queue in queues_to_start:
- backup.Backup().volume_backup_initiate(queue)
+ backup.Backup().create_volume_backup(queue)
# Refresh the task queue
# TODO(Alex): need to escalate discussion
@@ -86,7 +88,7 @@
LOG.info(_("The last backup cycle is not finished yet."
"So the new backup cycle is skipped."))
- @periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
+ @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)
def backup_engine(self):
LOG.info("backing... %s" % str(time.time()))
LOG.info("%s periodics" % self.name)
@@ -124,7 +126,39 @@
def reload(self):
LOG.info("%s reload" % self.name)
- @periodics.periodic(spacing=CONF.conductor.rotation_period, run_immediately=True)
+ def get_backup_list(self):
+ threshold_strtime = self.get_threshold_strtime()
+ if threshold_strtime == None: return False
+ self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
+ return True
+
+ def remove_backups(self):
+ print(self.backup_list)
+ for retention_backup in self.backup_list:
+ # 1. check the backup status and delete only available backups
+ backup.Backup().remove_volume_backup(retention_backup)
+
+ @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
def rotation_engine(self):
LOG.info("%s rotation_engine" % self.name)
+ # 1. get the list of backups to remove based on the retention time
+ if not self.get_backup_list(): return
+ # 2. remove the backups
+ self.remove_backups()
+
+ # get the threshold time str
+ def get_threshold_strtime(self):
+ time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)
+ if time_delta_dict == None: return None
+
+ res = xtime.timeago(
+ years=time_delta_dict["years"],
+ months=time_delta_dict["months"],
+ weeks=time_delta_dict["weeks"],
+ days=time_delta_dict["days"],
+ )
+ if res == None: LOG.info(_("Retention time format is invalid. "
+ "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
+
+ return res.strftime(constants.DEFAULT_TIME_FORMAT)
diff --git a/staffeln/conf/api.py b/staffeln/conf/api.py
index 846f959..e405d6a 100755
--- a/staffeln/conf/api.py
+++ b/staffeln/conf/api.py
@@ -1,7 +1,6 @@
from oslo_config import cfg
from staffeln.i18n import _
-
api_group = cfg.OptGroup(
"api",
title="API options",
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 06d433b..4e2aad9 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,7 +1,6 @@
from oslo_config import cfg
from staffeln.i18n import _
-
conductor_group = cfg.OptGroup(
"conductor",
title="Conductor Options",
@@ -13,12 +12,12 @@
"backup_workers",
default=1,
help=_("The maximum number of backup processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.IntOpt(
- "backup_period",
+ "backup_service_period",
default=60,
- min=1,
+ min=10,
help=_("The time of bakup period, the unit is one minute."),
),
cfg.StrOpt(
@@ -38,13 +37,25 @@
"rotation_workers",
default=1,
help=_("The maximum number of rotation processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.IntOpt(
- "rotation_period",
+ "retention_service_period",
+ default=20,
+ min=10,
+ help=_("The period of the retention service, the unit is one second."),
+ ),
+ cfg.IntOpt(
+ "rotation_workers",
default=1,
- min=1,
- help=_("The time of rotation period, the unit is one day."),
+ help=_("The maximum number of rotation processes to "
+ "fork and run. Default to number of CPUs on the host."),
+ ),
+ cfg.StrOpt(
+ "retention_time",
+ default="2w3d",
+ help=_("The time of retention period, the for mat is "
+ "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
),
]
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 83bb484..6ba5183 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -109,7 +109,7 @@
if filters is None:
filters = {}
- plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id"]
+ plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
return self._add_filters(
query=query,
@@ -144,6 +144,7 @@
for raw_fieldname, value in filters.items():
fieldname, operator_ = self.__decompose_filter(raw_fieldname)
+
if fieldname in plain_fields:
query = self.__add_simple_filter(
query, model, fieldname, value, operator_
@@ -161,7 +162,6 @@
):
if not isinstance(value, datetime.datetime):
value = timeutils.parse_isotime(value)
-
return query.filter(self.valid_operators[operator_](field, value))
def __decompose_filter(self, raw_fieldname):
@@ -306,3 +306,9 @@
return self._soft_delete(models.Queue_data, id)
except:
LOG.error("Queue Not found.")
+
+ def soft_delete_backup(self, id):
+ try:
+ return self._soft_delete(models.Backup_data, id)
+ except:
+ LOG.error("Backup Not found.")
\ No newline at end of file
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 3933bdf..276dd2e 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -27,13 +27,12 @@
"""
db_backups = cls.dbapi.get_backup_list(context, filters=filters)
- return [cls._from_db_object(cls(), obj) for obj in db_backups]
+ return [cls._from_db_object(cls(context), obj) for obj in db_backups]
@base.remotable
def create(self):
"""Create a :class:`Backup_data` record in the DB"""
values = self.obj_get_changes()
- print(values)
db_backup = self.dbapi.create_backup(values)
self._from_db_object(self, db_backup)
@@ -59,3 +58,8 @@
"""
current = self.get_by_uuid(backup_id=self.backup_id)
self.obj_refresh(current)
+
+ @base.remotable
+ def delete_backup(self):
+ """Soft Delete the :class:`Queue_data` from the DB"""
+ db_obj = self.dbapi.soft_delete_backup(self.id)