Add remove volume backup logic
diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index d3d0412..26cd159 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -7,7 +7,6 @@
from staffeln.conductor import manager
import staffeln.conf
-
CONF = staffeln.conf.CONF
@@ -15,7 +14,8 @@
service.prepare_service()
sm = cotyledon.ServiceManager()
- sm.add(manager.BackupManager, workers=CONF.conductor.backup_workers, args=(CONF,))
+ sm.add(manager.BackupManager,
+ workers=CONF.conductor.backup_workers, args=(CONF,))
# sm.add(manager.RotationManager,
# workers=CONF.conductor.rotation_workers, args=(CONF,))
oslo_config_glue.setup(sm, CONF)
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index d537d48..6de73f6 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -2,4 +2,6 @@
BACKUP_WIP=1
BACKUP_PLANNED=0
-BACKUP_ENABLED_KEY = 'true'
\ No newline at end of file
+BACKUP_ENABLED_KEY = 'true'
+
+DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
\ No newline at end of file
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index e934063..ada7faa 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,214 +1,227 @@
-import staffeln.conf
-import collections
-from staffeln.common import constants
-
-from openstack import exceptions
-from oslo_log import log
-from staffeln.common import auth
-from staffeln.common import context
-from staffeln import objects
-from staffeln.i18n import _
-
-CONF = staffeln.conf.CONF
-LOG = log.getLogger(__name__)
-
-BackupMapping = collections.namedtuple(
- "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
-)
-
-QueueMapping = collections.namedtuple(
- "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
-)
-
-conn = auth.create_connection()
-
-
-def check_vm_backup_metadata(metadata):
- if not CONF.conductor.backup_metadata_key in metadata:
- return False
- return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
-
-
-def get_projects_list():
- projects = conn.list_projects()
- return projects
-
-
-class Backup(object):
- """Implmentations of the queue with the sql."""
-
- def __init__(self):
- self.ctx = context.make_context()
- self.discovered_backup_map = None
- self.queue_mapping = dict()
- self.volume_mapping = dict()
-
- def get_backups(self, filters=None):
- return objects.Volume.list(self.ctx, filters=filters)
-
- def get_queues(self, filters=None):
- """Get the list of volume queue columns from the queue_data table"""
- queues = objects.Queue.list(self.ctx, filters=filters)
- return queues
-
- def create_queue(self):
- """Create the queue of all the volumes for backup"""
- queue_list = self.check_instance_volumes()
- for queue in queue_list:
- self._volume_queue(queue)
-
- # Backup the volumes attached to which has a specific metadata
- def filter_server(self, metadata):
-
- if not CONF.conductor.backup_metadata_key in metadata:
- return False
-
- return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
-
- # Backup the volumes in in-use and available status
- def filter_volume(self, volume_id):
- volume = conn.get_volume_by_id(volume_id)
- res = volume['status'] in ("available", "in-use")
- if not res:
- LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
-
- def check_backup_status(self, backup_object):
- try:
- backup = conn.get_volume_backup(backup_object.backup_id)
- # TODO(Alex): "deleting", "restoring", "error_restoring", "error"
- res = backup["status"] in ("available")
- if not res:
- LOG.info(_("Rotation for the backup %s is skipped "
- "because it is in %s status")%(backup_object.backup_id, backup["status"]))
- return res
- except exceptions.ResourceNotFound:
- LOG.info(_("Backup % is not existing." % backup_object.backup_id))
- # remove from the backup table
- backup_object.delete_backup()
-
- return False
-
-
- def check_instance_volumes(self):
- """Get the list of all the volumes from the project using openstacksdk
- Function first list all the servers in the project and get the volumes
- that are attached to the instance.
- """
- queues_map = []
- projects = get_projects_list()
- for project in projects:
- servers = conn.compute.servers(
- details=True, all_projects=True, project_id=project.id
- )
- for server in servers:
- if not self.filter_server(server.metadata): continue
- for volume in server.attached_volumes:
- if not self.filter_volume(volume["id"]): continue
- queues_map.append(
- QueueMapping(
- volume_id=volume["id"],
- backup_id="NULL",
- instance_id=server.id,
- backup_status=constants.BACKUP_PLANNED,
- )
- )
- return queues_map
-
- def _volume_queue(self, task):
- """Saves the queue data to the database."""
-
- # TODO(Alex): Need to escalate discussion
- # When create the task list, need to check the WIP backup generators
- # which are created in the past backup cycle.
- # Then skip to create new tasks for the volumes whose backup is WIP
- volume_queue = objects.Queue(self.ctx)
- volume_queue.backup_id = task.backup_id
- volume_queue.volume_id = task.volume_id
- volume_queue.instance_id = task.instance_id
- volume_queue.backup_status = task.backup_status
- volume_queue.create()
-
- def volume_backup_initiate(self, queue):
- """Initiate the backup of the volume
- :params: queue: Provide the map of the volume that needs
- backup.
- This function will call the backupup api and change the
- backup_status and backup_id in the queue table.
- """
- backup_id = queue.backup_id
- if backup_id == "NULL":
- try:
- volume_backup = conn.block_storage.create_backup(
- volume_id=queue.volume_id, force=True
- )
- queue.backup_id = volume_backup.id
- queue.backup_status = constants.BACKUP_WIP
- queue.save()
- except exceptions as error:
- print(error)
- else:
- pass
- # TODO(Alex): remove this task from the task list
- # Backup planned task cannot have backup_id in the same cycle
- # Reserve for now because it is related to the WIP backup genenrators which
- # are not finished in the current cycle
-
- def process_failed_backup(self, task):
- # 1. TODO(Alex): notify via email
- LOG.error("Backup of the volume %s failed." % task.volume_id)
- # 2. TODO(Alex): remove failed backup instance from the openstack
- # then set the volume status in-use
- # 3. remove failed task from the task queue
- task.delete_queue()
-
- def process_available_backup(self, task):
- LOG.info("Backup of the volume %s is successful." % task.volume_id)
- # 1. save success backup in the backup table
- self._volume_backup(
- BackupMapping(
- volume_id=task.volume_id,
- backup_id=task.backup_id,
- instance_id=task.instance_id,
- backup_completed=1,
- )
- )
- # 2. remove from the task list
- task.delete_queue()
- # 3. TODO(Alex): notify via email
-
- def process_using_backup(self, task):
- # remove from the task list
- task.delete_queue()
-
- def check_volume_backup_status(self, queue):
- """Checks the backup status of the volume
- :params: queue: Provide the map of the volume that needs backup
- status checked.
- Call the backups api to see if the backup is successful.
- """
- for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
- if backup_gen.id == queue.backup_id:
- if backup_gen.status == "error":
- self.process_failed_backup(queue)
- elif backup_gen.status == "available":
- self.process_available_backup(queue)
- elif backup_gen.status == "creating":
- # TODO(Alex): Need to escalate discussion
- # How to proceed WIP bakcup generators?
- # To make things worse, the last backup generator is in progress till
- # the new backup cycle
- LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- else: # "deleting", "restoring", "error_restoring" status
- self.process_using_backup(queue)
-
- def _volume_backup(self, task):
- # matching_backups = [
- # g for g in self.available_backups if g.backup_id == task.backup_id
- # ]
- # if not matching_backups:
- volume_backup = objects.Volume(self.ctx)
- volume_backup.backup_id = task.backup_id
- volume_backup.volume_id = task.volume_id
- volume_backup.instance_id = task.instance_id
- volume_backup.backup_completed = task.backup_completed
- volume_backup.create()
+import staffeln.conf
+import collections
+from staffeln.common import constants
+
+from openstack import exceptions
+from oslo_log import log
+from staffeln.common import auth
+from staffeln.common import context
+from staffeln import objects
+from staffeln.i18n import _
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+BackupMapping = collections.namedtuple(
+ "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+)
+
+QueueMapping = collections.namedtuple(
+ "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+)
+
+conn = auth.create_connection()
+
+
+def check_vm_backup_metadata(metadata):
+ if not CONF.conductor.backup_metadata_key in metadata:
+ return False
+ return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+
+
+def get_projects_list():
+ projects = conn.list_projects()
+ return projects
+
+
+class Backup(object):
+ """Implmentations of the queue with the sql."""
+
+ def __init__(self):
+ self.ctx = context.make_context()
+ self.discovered_backup_map = None
+ self.queue_mapping = dict()
+ self.volume_mapping = dict()
+
+ def get_backups(self, filters=None):
+ return objects.Volume.list(self.ctx, filters=filters)
+
+ def get_queues(self, filters=None):
+ """Get the list of volume queue columns from the queue_data table"""
+ queues = objects.Queue.list(self.ctx, filters=filters)
+ return queues
+
+ def create_queue(self):
+ """Create the queue of all the volumes for backup"""
+ queue_list = self.check_instance_volumes()
+ for queue in queue_list:
+ self._volume_queue(queue)
+
+ # Backup the volumes attached to which has a specific metadata
+ def filter_server(self, metadata):
+
+ if not CONF.conductor.backup_metadata_key in metadata:
+ return False
+
+ return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+
+ # Backup the volumes in in-use and available status
+ def filter_volume(self, volume_id):
+ try:
+ volume = conn.get_volume_by_id(volume_id)
+ if volume == None: return False
+ res = volume['status'] in ("available", "in-use")
+ if not res:
+ LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+ return res
+
+ except exceptions.ResourceNotFound:
+ return False
+
+ def remove_volume_backup(self, backup_object):
+ try:
+ backup = conn.get_volume_backup(backup_object.backup_id)
+ if backup == None: return False
+ if backup["status"] in ("available"):
+ conn.delete_volume_backup(backup_object.backup_id)
+ backup_object.delete_backup()
+ elif backup["status"] in ("error", "error_restoring"):
+ # TODO(Alex): need to discuss
+ # now if backup is in error status, then retention service
+ # does not remove it from openstack but removes it from the
+ # backup table so user can delete it on Horizon.
+ backup_object.delete_backup()
+ else: # "deleting", "restoring"
+ LOG.info(_("Rotation for the backup %s is skipped in this cycle "
+ "because it is in %s status") % (backup_object.backup_id, backup["status"]))
+
+ except exceptions.ResourceNotFound:
+ LOG.info(_("Backup %s is not existing in Openstack."
+ "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+ # remove from the backup table
+ backup_object.delete_backup()
+ return False
+
+ def check_instance_volumes(self):
+ """Get the list of all the volumes from the project using openstacksdk
+ Function first list all the servers in the project and get the volumes
+ that are attached to the instance.
+ """
+ queues_map = []
+ projects = get_projects_list()
+ for project in projects:
+ servers = conn.compute.servers(
+ details=True, all_projects=True, project_id=project.id
+ )
+ for server in servers:
+ if not self.filter_server(server.metadata): continue
+ for volume in server.attached_volumes:
+ if not self.filter_volume(volume["id"]): continue
+ queues_map.append(
+ QueueMapping(
+ volume_id=volume["id"],
+ backup_id="NULL",
+ instance_id=server.id,
+ backup_status=constants.BACKUP_PLANNED,
+ )
+ )
+ return queues_map
+
+ def _volume_queue(self, task):
+ """Saves the queue data to the database."""
+
+ # TODO(Alex): Need to escalate discussion
+ # When create the task list, need to check the WIP backup generators
+ # which are created in the past backup cycle.
+ # Then skip to create new tasks for the volumes whose backup is WIP
+ volume_queue = objects.Queue(self.ctx)
+ volume_queue.backup_id = task.backup_id
+ volume_queue.volume_id = task.volume_id
+ volume_queue.instance_id = task.instance_id
+ volume_queue.backup_status = task.backup_status
+ volume_queue.create()
+
+ def create_volume_backup(self, queue):
+ """Initiate the backup of the volume
+ :params: queue: Provide the map of the volume that needs
+ backup.
+ This function will call the backupup api and change the
+ backup_status and backup_id in the queue table.
+ """
+ backup_id = queue.backup_id
+ if backup_id == "NULL":
+ try:
+ volume_backup = conn.block_storage.create_backup(
+ volume_id=queue.volume_id, force=True
+ )
+ queue.backup_id = volume_backup.id
+ queue.backup_status = constants.BACKUP_WIP
+ queue.save()
+ except exceptions as error:
+ print(error)
+ else:
+ pass
+ # TODO(Alex): remove this task from the task list
+ # Backup planned task cannot have backup_id in the same cycle
+ # Reserve for now because it is related to the WIP backup genenrators which
+ # are not finished in the current cycle
+
+ def process_failed_backup(self, task):
+ # 1. TODO(Alex): notify via email
+ LOG.error("Backup of the volume %s failed." % task.volume_id)
+ # 2. TODO(Alex): remove failed backup instance from the openstack
+ # then set the volume status in-use
+ # 3. remove failed task from the task queue
+ task.delete_queue()
+
+ def process_available_backup(self, task):
+ LOG.info("Backup of the volume %s is successful." % task.volume_id)
+ # 1. save success backup in the backup table
+ self._volume_backup(
+ BackupMapping(
+ volume_id=task.volume_id,
+ backup_id=task.backup_id,
+ instance_id=task.instance_id,
+ backup_completed=1,
+ )
+ )
+ # 2. remove from the task list
+ task.delete_queue()
+ # 3. TODO(Alex): notify via email
+
+ def process_using_backup(self, task):
+ # remove from the task list
+ task.delete_queue()
+
+ def check_volume_backup_status(self, queue):
+ """Checks the backup status of the volume
+ :params: queue: Provide the map of the volume that needs backup
+ status checked.
+ Call the backups api to see if the backup is successful.
+ """
+ for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+ if backup_gen.id == queue.backup_id:
+ if backup_gen.status == "error":
+ self.process_failed_backup(queue)
+ elif backup_gen.status == "available":
+ self.process_available_backup(queue)
+ elif backup_gen.status == "creating":
+ # TODO(Alex): Need to escalate discussion
+ # How to proceed WIP bakcup generators?
+ # To make things worse, the last backup generator is in progress till
+ # the new backup cycle
+ LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+ else: # "deleting", "restoring", "error_restoring" status
+ self.process_using_backup(queue)
+
+ def _volume_backup(self, task):
+ # matching_backups = [
+ # g for g in self.available_backups if g.backup_id == task.backup_id
+ # ]
+ # if not matching_backups:
+ volume_backup = objects.Volume(self.ctx)
+ volume_backup.backup_id = task.backup_id
+ volume_backup.volume_id = task.volume_id
+ volume_backup.instance_id = task.instance_id
+ volume_backup.backup_completed = task.backup_completed
+ volume_backup.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 2f3d39b..4493001 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -74,7 +74,7 @@
)
if len(queues_to_start) != 0:
for queue in queues_to_start:
- backup.Backup().volume_backup_initiate(queue)
+ backup.Backup().create_volume_backup(queue)
# Refresh the task queue
# TODO(Alex): need to escalate discussion
@@ -126,29 +126,26 @@
def reload(self):
LOG.info("%s reload" % self.name)
+ def get_backup_list(self):
+ threshold_strtime = self.get_threshold_strtime()
+ if threshold_strtime == None: return False
+ self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
+ return True
+
+ def remove_backups(self):
+ print(self.backup_list)
+ for retention_backup in self.backup_list:
+ # 1. check the backup status and delete only available backups
+ backup.Backup().remove_volume_backup(retention_backup)
+
@periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
def rotation_engine(self):
LOG.info("%s rotation_engine" % self.name)
- # TODO(Alex): No limitation for retention?
- # Even though, the backup count is over the limit, the retention service will
- # continue working?
- # filter based on backup status
- # filter based on backup creation time
- # 2. delete the backups
-
-
# 1. get the list of backups to remove based on the retention time
- threshold_strtime = self.get_threshold_strtime()
- if threshold_strtime == None: return
- backups = backup.Backup().get_backups(filters={"created_at__lt", threshold_strtime})
+ if not self.get_backup_list(): return
# 2. remove the backups
- for retention_backup in backups:
- # 1. check the backup status and delete only available backups
- if not backup.Backup().check_backup_status(retention_backup): continue
- # 2. remove backup
-
- pass
+ self.remove_backups()
# get the threshold time str
def get_threshold_strtime(self):
@@ -156,13 +153,12 @@
if time_delta_dict == None: return None
res = xtime.timeago(
- years= time_delta_dict["years"],
- months= time_delta_dict["months"],
- weeks= time_delta_dict["weeks"],
- days= time_delta_dict["days"],
+ years=time_delta_dict["years"],
+ months=time_delta_dict["months"],
+ weeks=time_delta_dict["weeks"],
+ days=time_delta_dict["days"],
)
if res == None: LOG.info(_("Retention time format is invalid. "
"Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
- return datetime.datetime.strptime(res, constants.DEFAULT_TIME_FORMAT)
-
+ return res.strftime(constants.DEFAULT_TIME_FORMAT)
diff --git a/staffeln/conf/api.py b/staffeln/conf/api.py
index 846f959..e405d6a 100755
--- a/staffeln/conf/api.py
+++ b/staffeln/conf/api.py
@@ -1,7 +1,6 @@
from oslo_config import cfg
from staffeln.i18n import _
-
api_group = cfg.OptGroup(
"api",
title="API options",
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index e63895f..4e2aad9 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,7 +1,6 @@
from oslo_config import cfg
from staffeln.i18n import _
-
conductor_group = cfg.OptGroup(
"conductor",
title="Conductor Options",
@@ -13,7 +12,7 @@
"backup_workers",
default=1,
help=_("The maximum number of backup processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.IntOpt(
"backup_service_period",
@@ -38,24 +37,25 @@
"rotation_workers",
default=1,
help=_("The maximum number of rotation processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.IntOpt(
"retention_service_period",
default=20,
min=10,
- help=_("The time of rotation period, the unit is one day."),
+ help=_("The period of the retention service, the unit is one second."),
),
cfg.IntOpt(
"rotation_workers",
default=1,
help=_("The maximum number of rotation processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.StrOpt(
"retention_time",
- default="2021-05-05 14:56:00",
- help=_("The time of rotation period, the unit is one day."),
+ default="2w3d",
+ help=_("The time of retention period, the for mat is "
+ "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
),
]
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 83bb484..6ba5183 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -109,7 +109,7 @@
if filters is None:
filters = {}
- plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id"]
+ plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
return self._add_filters(
query=query,
@@ -144,6 +144,7 @@
for raw_fieldname, value in filters.items():
fieldname, operator_ = self.__decompose_filter(raw_fieldname)
+
if fieldname in plain_fields:
query = self.__add_simple_filter(
query, model, fieldname, value, operator_
@@ -161,7 +162,6 @@
):
if not isinstance(value, datetime.datetime):
value = timeutils.parse_isotime(value)
-
return query.filter(self.valid_operators[operator_](field, value))
def __decompose_filter(self, raw_fieldname):
@@ -306,3 +306,9 @@
return self._soft_delete(models.Queue_data, id)
except:
LOG.error("Queue Not found.")
+
+ def soft_delete_backup(self, id):
+ try:
+ return self._soft_delete(models.Backup_data, id)
+ except:
+ LOG.error("Backup Not found.")
\ No newline at end of file
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 3933bdf..be87b61 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -27,7 +27,7 @@
"""
db_backups = cls.dbapi.get_backup_list(context, filters=filters)
- return [cls._from_db_object(cls(), obj) for obj in db_backups]
+ return [cls._from_db_object(cls(context), obj) for obj in db_backups]
@base.remotable
def create(self):
@@ -59,3 +59,8 @@
"""
current = self.get_by_uuid(backup_id=self.backup_id)
self.obj_refresh(current)
+
+ @base.remotable
+ def delete_backup(self):
+ """Soft Delete the :class:`Queue_data` from the DB"""
+ db_obj = self.dbapi.soft_delete_backup(self.id)