Change module names
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 69c48f4..f9cde7b 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -3,12 +3,11 @@
from staffeln.common import constants
from openstack import exceptions
-from openstack.block_storage.v2 import backup
from oslo_log import log
from staffeln.common import auth
from staffeln.common import context
from staffeln import objects
-from staffeln.common import short_id
+from staffeln.i18n import _
CONF = staffeln.conf.CONF
LOG = log.getLogger(__name__)
@@ -29,12 +28,6 @@
return False
return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
-
-def backup_volumes_in_project(conn, project_name):
- # conn.list_servers()
- pass
-
-
def get_projects_list():
projects = conn.list_projects()
return projects
@@ -49,6 +42,9 @@
self.queue_mapping = dict()
self.volume_mapping = dict()
+ def get_backups(self):
+ return objects.Volume.list(self.ctx)
+
def get_queues(self, filters=None):
"""Get the list of volume queue columns from the queue_data table"""
queues = objects.Queue.list(self.ctx, filters=filters)
@@ -71,7 +67,9 @@
# Backup the volumes in in-use and available status
def filter_volume(self, volume_id):
volume = conn.get_volume_by_id(volume_id)
- return volume['status'] in ("available", "in-use")
+ res = volume['status'] in ("available", "in-use")
+ if not res:
+ LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
def check_instance_volumes(self):
"""Get the list of all the volumes from the project using openstacksdk
@@ -86,15 +84,13 @@
)
for server in servers:
if not self.filter_server(server.metadata): continue
- server_id = server.id
- volumes = server.attached_volumes
- for volume in volumes:
+ for volume in server.attached_volumes:
if not self.filter_volume(volume["id"]): continue
queues_map.append(
QueueMapping(
volume_id=volume["id"],
backup_id="NULL",
- instance_id=server_id,
+ instance_id=server.id,
backup_status=constants.BACKUP_PLANNED,
)
)
@@ -127,7 +123,6 @@
volume_backup = conn.block_storage.create_backup(
volume_id=queue.volume_id, force=True
)
- print(volume_backup)
queue.backup_id = volume_backup.id
queue.backup_status = constants.BACKUP_WIP
queue.save()
@@ -140,15 +135,15 @@
# Reserve for now because it is related to the WIP backup genenrators which
# are not finished in the current cycle
- def process_failed_task(self, task):
- LOG.error("Backup of the volume %s failed." % task.id)
+ def process_failed_backup(self, task):
# 1. TODO(Alex): notify via email
+ LOG.error("Backup of the volume %s failed." % task.volume_id)
# 2. TODO(Alex): remove failed backup instance from the openstack
+ # then set the volume status in-use
# 3. remove failed task from the task queue
- queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
- queue_delete.delete_queue()
+ task.delete_queue()
- def process_success_backup(self, task):
+ def process_available_backup(self, task):
LOG.info("Backup of the volume %s is successful." % task.volume_id)
# 1. save success backup in the backup table
self._volume_backup(
@@ -160,10 +155,13 @@
)
)
# 2. remove from the task list
- queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
- queue_delete.delete_queue()
+ task.delete_queue()
# 3. TODO(Alex): notify via email
+ def process_using_backup(self, task):
+ # remove from the task list
+ task.delete_queue()
+
def check_volume_backup_status(self, queue):
"""Checks the backup status of the volume
:params: queue: Provide the map of the volume that needs backup
@@ -173,15 +171,17 @@
for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
if backup_gen.id == queue.backup_id:
if backup_gen.status == "error":
- self.process_failed_task(queue)
- elif backup_gen.status == "success":
- self.process_success_backup(queue)
- else:
+ self.process_failed_backup(queue)
+ elif backup_gen.status == "available":
+ self.process_available_backup(queue)
+ elif backup_gen.status == "creating":
# TODO(Alex): Need to escalate discussion
# How to proceed WIP bakcup generators?
# To make things worse, the last backup generator is in progress till
# the new backup cycle
LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+ else: # "deleting", "restoring", "error_restoring" status
+ self.process_using_backup(queue)
def _volume_backup(self, task):
# matching_backups = [
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 78869e0..47a3ec3 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -2,7 +2,6 @@
from futurist import periodics
from oslo_log import log
import staffeln.conf
-import sys
import threading
import time
@@ -42,44 +41,60 @@
def reload(self):
LOG.info("%s reload" % self.name)
- # return the task(queue) list which are working in progress
- def get_wip_tasks(self):
- return backup.Backup().get_queues(
+ # Check if the backup count is over the limit
+ # TODO(Alex): how to count the backup number
+ # only available backups are calculated?
+ def _over_limitation(self):
+ LOG.info(_("Checking the backup limitation..."))
+ max_count = CONF.conductor.max_backup_count
+ current_count = len(backup.Backup().get_backups())
+ if max_count <= current_count:
+ # TODO(Alex): Send notification
+ LOG.info(_("The backup limit is over."))
+ return True
+ LOG.info(_("The max limit is %s, and current backup count is %s" % (max_count, current_count)))
+ return False
+
+ # Manage active backup generators
+ def _process_wip_tasks(self):
+ LOG.info(_("Processing WIP backup generators..."))
+ queues_started = backup.Backup().get_queues(
filters={"backup_status": constants.BACKUP_WIP}
)
+ if len(queues_started) != 0:
+ for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
- # return the task(queue) list which needs to do
- def get_todo_tasks(self):
- return backup.Backup().get_queues(
+ # Create backup generators
+ def _process_new_tasks(self):
+ LOG.info(_("Creating new backup generators..."))
+ queues_to_start = backup.Backup().get_queues(
filters={"backup_status": constants.BACKUP_PLANNED}
)
+ if len(queues_to_start) != 0:
+ for queue in queues_to_start:
+ backup.Backup().volume_backup_initiate(queue)
- # return the task(queue) list which needs to do
- def get_all_tasks(self):
- return backup.Backup().get_queues()
+ # Refresh the task queue
+ # TODO(Alex): need to escalate discussion
+ # how to manage last backups not finished yet
+ def _update_task_queue(self):
+ LOG.info(_("Updating backup task queue..."))
+ all_tasks = backup.Backup().get_queues()
+ if len(all_tasks) == 0:
+ backup.Backup().create_queue()
+ else:
+ LOG.info(_("The last backup cycle is not finished yet."
+ "So the new backup cycle is skipped."))
@periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
def backup_engine(self):
LOG.info("backing... %s" % str(time.time()))
LOG.info("%s periodics" % self.name)
- # TODO(Alex): need to escalate discussion
- # how to manage last backups not finished yet
- if len(self.get_all_tasks()) == 0:
- backup.Backup().create_queue()
- else:
- LOG.info(_("The last backup cycle is not finished yet."
- "So the new backup cycle is skipped."))
-
- queues_started = self.get_wip_tasks()
- if len(queues_started) != 0:
- for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
-
- queues_to_start = self.get_todo_tasks()
- print(queues_to_start)
- if len(queues_to_start) != 0:
- for queue in queues_to_start:
- backup.Backup().volume_backup_initiate(queue)
+ if self._over_limitation(): return
+ self._update_task_queue()
+ self._process_wip_tasks()
+ self._process_new_tasks()
class RotationManager(cotyledon.Service):
@@ -93,7 +108,6 @@
def run(self):
LOG.info("%s run" % self.name)
- interval = CONF.conductor.rotation_period
periodic_callables = [
(self.rotation_engine, (), {}),
@@ -112,5 +126,5 @@
@periodics.periodic(spacing=CONF.conductor.rotation_period, run_immediately=True)
def rotation_engine(self):
- print("rotating... %s" % str(time.time()))
LOG.info("%s rotation_engine" % self.name)
+
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 7d96579..06d433b 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -26,6 +26,11 @@
default="__automated_backup",
help=_("The key string of metadata the VM, which requres back up, has"),
),
+ cfg.IntOpt(
+ "max_backup_count",
+ default=10,
+ help=_("The key string of metadata the VM, which requres back up, has"),
+ ),
]
rotation_opts = [
diff --git a/staffeln/objects/__init__.py b/staffeln/objects/__init__.py
index 8003db1..4f7ca56 100755
--- a/staffeln/objects/__init__.py
+++ b/staffeln/objects/__init__.py
@@ -1,4 +1,5 @@
from .queue import Queue
+from .volume import Volume
# from volume import Volume
def register_all():
__import__('staffeln.objects.volume')