Use openstack cloud api directly
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index ada7faa..834b705 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -151,14 +151,15 @@
backup_id = queue.backup_id
if backup_id == "NULL":
try:
- volume_backup = conn.block_storage.create_backup(
+ volume_backup = conn.create_volume_backup(
volume_id=queue.volume_id, force=True
)
queue.backup_id = volume_backup.id
queue.backup_status = constants.BACKUP_WIP
queue.save()
except exceptions as error:
- print(error)
+ LOG.info(_("Backup creation for the volume %s failled. %s"
+ % (queue.volume_id, str(error))))
else:
pass
# TODO(Alex): remove this task from the task list
@@ -199,20 +200,27 @@
status checked.
Call the backups api to see if the backup is successful.
"""
- for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
- if backup_gen.id == queue.backup_id:
- if backup_gen.status == "error":
- self.process_failed_backup(queue)
- elif backup_gen.status == "available":
- self.process_available_backup(queue)
- elif backup_gen.status == "creating":
- # TODO(Alex): Need to escalate discussion
- # How to proceed WIP bakcup generators?
- # To make things worse, the last backup generator is in progress till
- # the new backup cycle
- LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- else: # "deleting", "restoring", "error_restoring" status
- self.process_using_backup(queue)
+ # for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+ try:
+ backup_gen = conn.get_volume_backup(queue.backup_id)
+ if backup_gen == None:
+ # TODO(Alex): need to check when it is none
+ LOG.info(_("Backup status of %s is returning none."%(queue.backup_id)))
+ return
+ if backup_gen.status == "error":
+ self.process_failed_backup(queue)
+ elif backup_gen.status == "available":
+ self.process_available_backup(queue)
+ elif backup_gen.status == "creating":
+ # TODO(Alex): Need to escalate discussion
+ # How to proceed WIP bakcup generators?
+ # To make things worse, the last backup generator is in progress till
+ # the new backup cycle
+ LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+ else: # "deleting", "restoring", "error_restoring" status
+ self.process_using_backup(queue)
+ except exceptions.ResourceNotFound as e:
+ self.process_failed_backup(queue)
def _volume_backup(self, task):
# matching_backups = [
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 4493001..5501742 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,164 +1,164 @@
-import cotyledon
-import datetime
-from futurist import periodics
-from oslo_log import log
-import staffeln.conf
-import threading
-import time
-
-from staffeln.common import constants
-from staffeln.conductor import backup
-from staffeln.common import context
-from staffeln.common import time as xtime
-from staffeln.i18n import _
-
-LOG = log.getLogger(__name__)
-CONF = staffeln.conf.CONF
-
-
-class BackupManager(cotyledon.Service):
- name = "Staffeln conductor backup controller"
-
- def __init__(self, worker_id, conf):
- super(BackupManager, self).__init__(worker_id)
- self._shutdown = threading.Event()
- self.conf = conf
- self.ctx = context.make_context()
- LOG.info("%s init" % self.name)
-
- def run(self):
- LOG.info("%s run" % self.name)
- periodic_callables = [
- (self.backup_engine, (), {}),
- ]
- periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")
- periodic_thread = threading.Thread(target=periodic_worker.start)
- periodic_thread.daemon = True
- periodic_thread.start()
-
- def terminate(self):
- LOG.info("%s terminate" % self.name)
- super(BackupManager, self).terminate()
-
- def reload(self):
- LOG.info("%s reload" % self.name)
-
- # Check if the backup count is over the limit
- # TODO(Alex): how to count the backup number
- # only available backups are calculated?
- def _over_limitation(self):
- LOG.info(_("Checking the backup limitation..."))
- max_count = CONF.conductor.max_backup_count
- current_count = len(backup.Backup().get_backups())
- if max_count <= current_count:
- # TODO(Alex): Send notification
- LOG.info(_("The backup limit is over."))
- return True
- LOG.info(_("The max limit is %s, and current backup count is %s" % (max_count, current_count)))
- return False
-
- # Manage active backup generators
- def _process_wip_tasks(self):
- LOG.info(_("Processing WIP backup generators..."))
- queues_started = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_WIP}
- )
- if len(queues_started) != 0:
- for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
-
- # Create backup generators
- def _process_new_tasks(self):
- LOG.info(_("Creating new backup generators..."))
- queues_to_start = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_PLANNED}
- )
- if len(queues_to_start) != 0:
- for queue in queues_to_start:
- backup.Backup().create_volume_backup(queue)
-
- # Refresh the task queue
- # TODO(Alex): need to escalate discussion
- # how to manage last backups not finished yet
- def _update_task_queue(self):
- LOG.info(_("Updating backup task queue..."))
- all_tasks = backup.Backup().get_queues()
- if len(all_tasks) == 0:
- backup.Backup().create_queue()
- else:
- LOG.info(_("The last backup cycle is not finished yet."
- "So the new backup cycle is skipped."))
-
- @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)
- def backup_engine(self):
- LOG.info("backing... %s" % str(time.time()))
- LOG.info("%s periodics" % self.name)
-
- if self._over_limitation(): return
- self._update_task_queue()
- self._process_wip_tasks()
- self._process_new_tasks()
-
-
-class RotationManager(cotyledon.Service):
- name = "Staffeln conductor rotation controller"
-
- def __init__(self, worker_id, conf):
- super(RotationManager, self).__init__(worker_id)
- self._shutdown = threading.Event()
- self.conf = conf
- LOG.info("%s init" % self.name)
-
- def run(self):
- LOG.info("%s run" % self.name)
-
- periodic_callables = [
- (self.rotation_engine, (), {}),
- ]
- periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")
- periodic_thread = threading.Thread(target=periodic_worker.start)
- periodic_thread.daemon = True
- periodic_thread.start()
-
- def terminate(self):
- LOG.info("%s terminate" % self.name)
- super(RotationManager, self).terminate()
-
- def reload(self):
- LOG.info("%s reload" % self.name)
-
- def get_backup_list(self):
- threshold_strtime = self.get_threshold_strtime()
- if threshold_strtime == None: return False
- self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
- return True
-
- def remove_backups(self):
- print(self.backup_list)
- for retention_backup in self.backup_list:
- # 1. check the backup status and delete only available backups
- backup.Backup().remove_volume_backup(retention_backup)
-
- @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
- def rotation_engine(self):
- LOG.info("%s rotation_engine" % self.name)
- # 1. get the list of backups to remove based on the retention time
- if not self.get_backup_list(): return
-
- # 2. remove the backups
- self.remove_backups()
-
- # get the threshold time str
- def get_threshold_strtime(self):
- time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)
- if time_delta_dict == None: return None
-
- res = xtime.timeago(
- years=time_delta_dict["years"],
- months=time_delta_dict["months"],
- weeks=time_delta_dict["weeks"],
- days=time_delta_dict["days"],
- )
- if res == None: LOG.info(_("Retention time format is invalid. "
- "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
-
- return res.strftime(constants.DEFAULT_TIME_FORMAT)
+import cotyledon
+import datetime
+from futurist import periodics
+from oslo_log import log
+import staffeln.conf
+import threading
+import time
+
+from staffeln.common import constants
+from staffeln.conductor import backup
+from staffeln.common import context
+from staffeln.common import time as xtime
+from staffeln.i18n import _
+
+LOG = log.getLogger(__name__)
+CONF = staffeln.conf.CONF
+
+
+class BackupManager(cotyledon.Service):
+ name = "Staffeln conductor backup controller"
+
+ def __init__(self, worker_id, conf):
+ super(BackupManager, self).__init__(worker_id)
+ self._shutdown = threading.Event()
+ self.conf = conf
+ self.ctx = context.make_context()
+ LOG.info("%s init" % self.name)
+
+ def run(self):
+ LOG.info("%s run" % self.name)
+ periodic_callables = [
+ (self.backup_engine, (), {}),
+ ]
+ periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")
+ periodic_thread = threading.Thread(target=periodic_worker.start)
+ periodic_thread.daemon = True
+ periodic_thread.start()
+
+ def terminate(self):
+ LOG.info("%s terminate" % self.name)
+ super(BackupManager, self).terminate()
+
+ def reload(self):
+ LOG.info("%s reload" % self.name)
+
+ # Check if the backup count is over the limit
+ # TODO(Alex): how to count the backup number
+ # only available backups are calculated?
+ def _check_quota(self):
+ LOG.info(_("Checking the backup limitation..."))
+ max_count = CONF.conductor.max_backup_count
+ current_count = len(backup.Backup().get_backups())
+ if max_count <= current_count:
+ # TODO(Alex): Send notification
+ LOG.info(_("The backup limit is over."))
+ return True
+ LOG.info(_("The max limit is %s, and current backup count is %s" % (max_count, current_count)))
+ return False
+
+ # Manage active backup generators
+ def _process_wip_tasks(self):
+ LOG.info(_("Processing WIP backup generators..."))
+ queues_started = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_WIP}
+ )
+ if len(queues_started) != 0:
+ for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
+
+ # Create backup generators
+ def _process_new_tasks(self):
+ LOG.info(_("Creating new backup generators..."))
+ queues_to_start = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_PLANNED}
+ )
+ if len(queues_to_start) != 0:
+ for queue in queues_to_start:
+ backup.Backup().create_volume_backup(queue)
+
+ # Refresh the task queue
+ # TODO(Alex): need to escalate discussion
+ # how to manage last backups not finished yet
+ def _update_task_queue(self):
+ LOG.info(_("Updating backup task queue..."))
+ all_tasks = backup.Backup().get_queues()
+ if len(all_tasks) == 0:
+ backup.Backup().create_queue()
+ else:
+ LOG.info(_("The last backup cycle is not finished yet."
+ "So the new backup cycle is skipped."))
+
+ @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)
+ def backup_engine(self):
+ LOG.info("backing... %s" % str(time.time()))
+ LOG.info("%s periodics" % self.name)
+
+ if self._check_quota(): return
+ self._update_task_queue()
+ self._process_wip_tasks()
+ self._process_new_tasks()
+
+
+class RotationManager(cotyledon.Service):
+ name = "Staffeln conductor rotation controller"
+
+ def __init__(self, worker_id, conf):
+ super(RotationManager, self).__init__(worker_id)
+ self._shutdown = threading.Event()
+ self.conf = conf
+ LOG.info("%s init" % self.name)
+
+ def run(self):
+ LOG.info("%s run" % self.name)
+
+ periodic_callables = [
+ (self.rotation_engine, (), {}),
+ ]
+ periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")
+ periodic_thread = threading.Thread(target=periodic_worker.start)
+ periodic_thread.daemon = True
+ periodic_thread.start()
+
+ def terminate(self):
+ LOG.info("%s terminate" % self.name)
+ super(RotationManager, self).terminate()
+
+ def reload(self):
+ LOG.info("%s reload" % self.name)
+
+ def get_backup_list(self):
+ threshold_strtime = self.get_threshold_strtime()
+ if threshold_strtime == None: return False
+ self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
+ return True
+
+ def remove_backups(self):
+ print(self.backup_list)
+ for retention_backup in self.backup_list:
+ # 1. check the backup status and delete only available backups
+ backup.Backup().remove_volume_backup(retention_backup)
+
+ @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
+ def rotation_engine(self):
+ LOG.info("%s rotation_engine" % self.name)
+ # 1. get the list of backups to remove based on the retention time
+ if not self.get_backup_list(): return
+
+ # 2. remove the backups
+ self.remove_backups()
+
+ # get the threshold time str
+ def get_threshold_strtime(self):
+ time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)
+ if time_delta_dict == None: return None
+
+ res = xtime.timeago(
+ years=time_delta_dict["years"],
+ months=time_delta_dict["months"],
+ weeks=time_delta_dict["weeks"],
+ days=time_delta_dict["days"],
+ )
+ if res == None: LOG.info(_("Retention time format is invalid. "
+ "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
+
+ return res.strftime(constants.DEFAULT_TIME_FORMAT)