blob: ecac92b9599c8f3f41f146fb020d367c20d7d726 [file] [log] [blame]
import staffeln.conf
import collections
from staffeln.common import constants
from openstack.block_storage.v2 import backup
from oslo_log import log
from staffeln.common import auth
from staffeln.common import context
from staffeln import objects
from staffeln.common import short_id
CONF = staffeln.conf.CONF
LOG = log.getLogger(__name__)
BackupMapping = collections.namedtuple(
"BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
)
QueueMapping = collections.namedtuple(
"QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
)
conn = auth.create_connection()
def check_vm_backup_metadata(metadata):
if not CONF.conductor.backup_metadata_key in metadata:
return False
return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
def backup_volumes_in_project(conn, project_name):
# conn.list_servers()
pass
def get_projects_list():
projects = conn.list_projects()
return projects
class Backup(object):
"""Implmentations of the queue with the sql."""
def __init__(self):
self.ctx = context.make_context()
self.discovered_backup_map = None
self.queue_mapping = dict()
self.volume_mapping = dict()
self._available_backups = None
self._available_backups_map = None
self._available_queues = None
self._available_queues_map = None
# TODO(Susanta): Can you explain what it's for
@property
def available_queues(self):
"""Queues loaded from DB"""
if self._available_queues is None:
self._available_queues = objects.Queue.list(self.ctx)
return self._available_queues
# TODO(Susanta): Can you explain what it's for
@property
def available_queues_map(self):
"""Mapping of backup queue loaded from DB"""
if self._available_queues_map is None:
self._available_queues_map = {
QueueMapping(
backup_id=g.backup_id,
volume_id=g.volume_id,
instance_id=g.instance_id,
backup_status=g.backup_status,
): g
for g in self.available_queues
}
return self._available_queues_map
# TODO(Susanta): Can you explain what it's for
# @property
# def available_backups(self):
# """Backups loaded from DB"""
# if self._available_backups is None:
# self._available_backups = objects.Volume.list(self.ctx)
# return self._available_backups
# TODO(Susanta): Can you explain what it's for
@property
def available_backups_map(self):
"""Mapping of backup loaded from DB"""
if self._available_backups_map is None:
self._available_backups_map = {
QueueMapping(
backup_id=g.backup_id,
volume_id=g.volume_id,
instance_id=g.instance_id,
backup_completed=g.backup_completed,
): g
for g in self.available_queues
}
return self._available_queues_map
def get_queues(self, filters=None):
"""Get the list of volume queue columns from the queue_data table"""
queues = objects.Queue.list(self.ctx, filters=filters)
return queues
def create_queue(self):
"""Create the queue of all the volumes for backup"""
queue_list = self.check_instance_volumes()
for queue in queue_list:
self._volume_queue(queue)
def check_instance_volumes(self):
"""Get the list of all the volumes from the project using openstacksdk
Function first list all the servers in the project and get the volumes
that are attached to the instance.
"""
queues_map = {}
projects = get_projects_list()
for project in projects:
servers = conn.compute.servers(
details=True, all_projects=True, project_id=project.id
)
for server in servers:
server_id = server.host_id
volumes = server.attached_volumes
for volume in volumes:
queues_map.append(
QueueMapping(
volume_id=volume["id"],
backup_id="NULL",
instance_id=server_id,
backup_status=constants.BACKUP_PLANNED,
)
)
return queues_map
def _volume_queue(self, task):
"""Saves the queue data to the database."""
# TODO(Susanta): Why does this need?
matching_backups = [
g for g in self.available_queues if g.backup_id == task.backup_id
]
if not matching_backups:
volume_queue = objects.Queue(self.ctx)
volume_queue.backup_id = task.backup_id
volume_queue.volume_id = task.volume_id
volume_queue.instance_id = task.instance_id
volume_queue.backup_status = task.backup_status
volume_queue.create()
# TODO(Alex): Need to escalate discussion
# When create the task list, need to check the WIP backup genenrator
# which are created in the past backup cycle.
# Then skip to create new tasks for the volumes whose backup is WIP
def volume_backup_initiate(self, queue):
"""Initiate the backup of the volume
:params: queue: Provide the map of the volume that needs
backup.
This function will call the backupup api and change the
backup_status and backup_id in the queue table.
"""
backup_id = queue.backup_id
if backup_id == "NULL":
volume_backup = conn.block_storage.create_backup(
volume_id=queue.volume_id, force=True
)
update_queue = objects.Queue.get_by_id(self.ctx, queue.id)
update_queue.backup_id = volume_backup.id
update_queue.backup_status = constants.BACKUP_WIP
update_queue.save()
else:
pass
# TODO(Alex): remove this task from the task list
# Backup planned task cannot have backup_id in the same cycle
# Reserve for now because it is related to the WIP backup genenrators which
# are not finished in the current cycle
def process_failed_task(self, task):
LOG.error("Backup of the volume %s failed." % task.id)
# 1. TODO(Alex): notify via email
# 2. TODO(Alex): remove failed backup instance from the openstack
# 3. remove failed task from the task queue
queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
queue_delete.delete_queue()
def process_success_backup(self, task):
LOG.info("Backup of the volume %s is successful." % task.volume_id)
# 1. save success backup in the backup table
self._volume_backup(
BackupMapping(
volume_id=task.volume_id,
backup_id=task.backup_id,
instance_id=task.instance_id,
backup_completed=1,
)
)
# 2. remove from the task list
queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
queue_delete.delete_queue()
# 3. TODO(Alex): notify via email
def check_volume_backup_status(self, queue):
"""Checks the backup status of the volume
:params: queue: Provide the map of the volume that needs backup
status checked.
Call the backups api to see if the backup is successful.
"""
for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
if backup_gen.id == queue.backup_id:
if backup_gen.status == "error":
self.process_failed_task(queue)
elif backup_gen.status == "success":
self.process_success_backup(queue)
else:
# TODO(Alex): Need to escalate discussion
# How to proceed WIP bakcup generators?
# To make things worse, the last backup generator is in progress till
# the new backup cycle
LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
def _volume_backup(self, task):
# matching_backups = [
# g for g in self.available_backups if g.backup_id == task.backup_id
# ]
# if not matching_backups:
volume_backup = objects.Volume(self.ctx)
volume_backup.backup_id = task.backup_id
volume_backup.volume_id = task.volume_id
volume_backup.instance_id = task.instance_id
volume_backup.backup_completed = task.backup_completed
volume_backup.create()