Test and fix backup service, add some comments
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 0f0522a..ecac92b 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,5 +1,6 @@
import staffeln.conf
import collections
+from staffeln.common import constants
from openstack.block_storage.v2 import backup
from oslo_log import log
@@ -44,7 +45,6 @@
def __init__(self):
self.ctx = context.make_context()
- self.discovered_queue_map = None
self.discovered_backup_map = None
self.queue_mapping = dict()
self.volume_mapping = dict()
@@ -53,6 +53,7 @@
self._available_queues = None
self._available_queues_map = None
+ # TODO(Susanta): Can you explain what it's for
@property
def available_queues(self):
"""Queues loaded from DB"""
@@ -60,6 +61,7 @@
self._available_queues = objects.Queue.list(self.ctx)
return self._available_queues
+ # TODO(Susanta): Can you explain what it's for
@property
def available_queues_map(self):
"""Mapping of backup queue loaded from DB"""
@@ -75,13 +77,15 @@
}
return self._available_queues_map
- @property
- def available_backups(self):
- """Backups loaded from DB"""
- if self._available_backups is None:
- self._available_backups = objects.Volume.list(self.ctx)
- return self._available_backups
+ # TODO(Susanta): Can you explain what it's for
+ # @property
+ # def available_backups(self):
+ # """Backups loaded from DB"""
+ # if self._available_backups is None:
+ # self._available_backups = objects.Volume.list(self.ctx)
+ # return self._available_backups
+ # TODO(Susanta): Can you explain what it's for
@property
def available_backups_map(self):
"""Mapping of backup loaded from DB"""
@@ -104,10 +108,9 @@
def create_queue(self):
"""Create the queue of all the volumes for backup"""
- self.discovered_queue_map = self.check_instance_volumes()
- queues_map = self.discovered_queue_map["queues"]
- for queue_name, queue_map in queues_map.items():
- self._volume_queue(queue_map)
+ queue_list = self.check_instance_volumes()
+ for queue in queue_list:
+ self._volume_queue(queue)
def check_instance_volumes(self):
"""Get the list of all the volumes from the project using openstacksdk
@@ -115,7 +118,6 @@
that are attached to the instance.
"""
queues_map = {}
- discovered_queue_map = {"queues": queues_map}
projects = get_projects_list()
for project in projects:
servers = conn.compute.servers(
@@ -125,32 +127,35 @@
server_id = server.host_id
volumes = server.attached_volumes
for volume in volumes:
- queues_map["queues"] = QueueMapping(
- volume_id=volume["id"],
- backup_id="NULL",
- instance_id=server_id,
- backup_status=0,
+ queues_map.append(
+ QueueMapping(
+ volume_id=volume["id"],
+ backup_id="NULL",
+ instance_id=server_id,
+ backup_status=constants.BACKUP_PLANNED,
+ )
)
- return discovered_queue_map
+ return queues_map
- def _volume_queue(self, queue_map):
+ def _volume_queue(self, task):
"""Saves the queue data to the database."""
- volume_id = queue_map.volume_id
- backup_id = queue_map.backup_id
- instance_id = queue_map.instance_id
- backup_status = queue_map.backup_status
- backup_mapping = dict()
+ # TODO(Susanta): Why does this need?
matching_backups = [
- g for g in self.available_queues if g.backup_id == backup_id
+ g for g in self.available_queues if g.backup_id == task.backup_id
]
if not matching_backups:
volume_queue = objects.Queue(self.ctx)
- volume_queue.backup_id = backup_id
- volume_queue.volume_id = volume_id
- volume_queue.instance_id = instance_id
- volume_queue.backup_status = backup_status
+ volume_queue.backup_id = task.backup_id
+ volume_queue.volume_id = task.volume_id
+ volume_queue.instance_id = task.instance_id
+ volume_queue.backup_status = task.backup_status
volume_queue.create()
+ # TODO(Alex): Need to escalate discussion
+ # When create the task list, need to check the WIP backup genenrator
+ # which are created in the past backup cycle.
+ # Then skip to create new tasks for the volumes whose backup is WIP
+
def volume_backup_initiate(self, queue):
"""Initiate the backup of the volume
:params: queue: Provide the map of the volume that needs
@@ -158,7 +163,6 @@
This function will call the backupup api and change the
backup_status and backup_id in the queue table.
"""
- volume_info = conn.get_volume(queue.volume_id)
backup_id = queue.backup_id
if backup_id == "NULL":
volume_backup = conn.block_storage.create_backup(
@@ -166,8 +170,38 @@
)
update_queue = objects.Queue.get_by_id(self.ctx, queue.id)
update_queue.backup_id = volume_backup.id
- update_queue.backup_status = 1
+ update_queue.backup_status = constants.BACKUP_WIP
update_queue.save()
+ else:
+ pass
+ # TODO(Alex): remove this task from the task list
+ # Backup planned task cannot have backup_id in the same cycle
+ # Reserve for now because it is related to the WIP backup genenrators which
+ # are not finished in the current cycle
+
+ def process_failed_task(self, task):
+ LOG.error("Backup of the volume %s failed." % task.id)
+ # 1. TODO(Alex): notify via email
+ # 2. TODO(Alex): remove failed backup instance from the openstack
+ # 3. remove failed task from the task queue
+ queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
+ queue_delete.delete_queue()
+
+ def process_success_backup(self, task):
+ LOG.info("Backup of the volume %s is successful." % task.volume_id)
+ # 1. save success backup in the backup table
+ self._volume_backup(
+ BackupMapping(
+ volume_id=task.volume_id,
+ backup_id=task.backup_id,
+ instance_id=task.instance_id,
+ backup_completed=1,
+ )
+ )
+ # 2. remove from the task list
+ queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
+ queue_delete.delete_queue()
+ # 3. TODO(Alex): notify via email
def check_volume_backup_status(self, queue):
"""Checks the backup status of the volume
@@ -175,47 +209,28 @@
status checked.
Call the backups api to see if the backup is successful.
"""
- for raw in conn.block_storage.backups(volume_id=queue.volume_id):
- backup_info = raw
- if backup_info.id == queue.backup_id:
- if backup_info.status == "error":
- LOG.error("Backup of the volume %s failed." % queue.id)
- queue_delete = objects.Queue.get_by_id(self.ctx, queue.id)
- queue_delete.delete_queue()
- elif backup_info.status == "success":
- backups_map = {}
- discovered_backup_map = {"backups": backups_map}
- LOG.info("Backup of the volume %s is successful." % queue.volume_id)
- backups_map["backups"] = BackupMapping(
- volume_id=queue.volume_id,
- backup_id=queue.backup_id,
- instance_id=queue.instance_id,
- backup_completed=1,
- )
- # Save volume backup success to backup_data table.
- self._volume_backup(discovered_backup_map)
- ## call db api to remove the queue object.
- queue_delete = objects.Queue.get_by_id(self.ctx, queue.id)
- queue_delete.delete_queue()
+ for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+ if backup_gen.id == queue.backup_id:
+ if backup_gen.status == "error":
+ self.process_failed_task(queue)
+ elif backup_gen.status == "success":
+ self.process_success_backup(queue)
else:
- pass
- ## Wait for the backup to be completed.
+ # TODO(Alex): Need to escalate discussion
+ # How to proceed WIP bakcup generators?
+ # To make things worse, the last backup generator is in progress till
+ # the new backup cycle
+ LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- def _volume_backup(self, discovered_backup_map):
- volumes_map = discovered_backup_map["backups"]
- for volume_name, volume_map in volumes_map.items():
- volume_id = volume_map.volume_id
- backup_id = volume_map.backup_id
- instance_id = volume_map.instance_id
- backup_completed = volume_map.backup_completed
- backup_mapping = dict()
- matching_backups = [
- g for g in self.available_backups if g.backup_id == backup_id
- ]
- if not matching_backups:
- volume_backup = objects.Volume(self.ctx)
- volume_backup.backup_id = backup_id
- volume_backup.volume_id = volume_id
- volume_backup.instance_id = instance_id
- volume_backup.backup_completed = backup_completed
- volume_backup.create()
+
+ def _volume_backup(self, task):
+ # matching_backups = [
+ # g for g in self.available_backups if g.backup_id == task.backup_id
+ # ]
+ # if not matching_backups:
+ volume_backup = objects.Volume(self.ctx)
+ volume_backup.backup_id = task.backup_id
+ volume_backup.volume_id = task.volume_id
+ volume_backup.instance_id = task.instance_id
+ volume_backup.backup_completed = task.backup_completed
+ volume_backup.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 7f12246..be3b433 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -10,7 +10,6 @@
from staffeln.conductor import backup
from staffeln.common import context
-
LOG = log.getLogger(__name__)
CONF = staffeln.conf.CONF
@@ -42,27 +41,39 @@
def reload(self):
LOG.info("%s reload" % self.name)
+ # return the task(queue) list which are working in progress
+ def get_wip_tasks(self):
+ return backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_WIP}
+ )
+
+ # return the task(queue) list which needs to do
+ def get_todo_tasks(self):
+ return backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_PLANNED}
+ )
+
+ # return the task(queue) list which needs to do
+ def get_all_tasks(self):
+ return backup.Backup().get_queues()
+
@periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
def backup_engine(self):
LOG.info("backing... %s" % str(time.time()))
LOG.info("%s periodics" % self.name)
- queue = backup.Backup().get_queues()
- queues_to_start = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_PLANNED}
- )
- queues_started = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_WIP}
- )
- if len(queue) == 0:
- create_queue = backup.Backup().create_queue()
- elif len(queues_started) != 0:
- for queue in queues_started:
- LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- backup_volume = backup.Backup().check_volume_backup_status(queue)
- elif len(queues_to_start) != 0:
+
+ if len(self.get_all_tasks()) == 0:
+ backup.Backup().create_queue()
+ # TODO(Alex): reschedule the backup engine immediately
+
+ queues_started = self.get_wip_tasks()
+ if len(queues_started) != 0:
+ for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
+
+ queues_to_start = self.get_todo_tasks()
+ if len(queues_to_start) != 0:
for queue in queues_to_start:
- LOG.info("Started backup process for %s" % queue.volume_id)
- backup_volume = backup.Backup().volume_backup_initiate(queue)
+ backup.Backup().volume_backup_initiate(queue)
class RotationManager(cotyledon.Service):