blob: ada7faa391ddd071fe2dacf3a6df178b450acd19 [file] [log] [blame]
import staffeln.conf
import collections
from staffeln.common import constants
from openstack import exceptions
from oslo_log import log
from staffeln.common import auth
from staffeln.common import context
from staffeln import objects
from staffeln.i18n import _
CONF = staffeln.conf.CONF
LOG = log.getLogger(__name__)
BackupMapping = collections.namedtuple(
"BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
)
QueueMapping = collections.namedtuple(
"QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
)
conn = auth.create_connection()
def check_vm_backup_metadata(metadata):
if not CONF.conductor.backup_metadata_key in metadata:
return False
return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
def get_projects_list():
projects = conn.list_projects()
return projects
class Backup(object):
"""Implmentations of the queue with the sql."""
def __init__(self):
self.ctx = context.make_context()
self.discovered_backup_map = None
self.queue_mapping = dict()
self.volume_mapping = dict()
def get_backups(self, filters=None):
return objects.Volume.list(self.ctx, filters=filters)
def get_queues(self, filters=None):
"""Get the list of volume queue columns from the queue_data table"""
queues = objects.Queue.list(self.ctx, filters=filters)
return queues
def create_queue(self):
"""Create the queue of all the volumes for backup"""
queue_list = self.check_instance_volumes()
for queue in queue_list:
self._volume_queue(queue)
# Backup the volumes attached to which has a specific metadata
def filter_server(self, metadata):
if not CONF.conductor.backup_metadata_key in metadata:
return False
return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
# Backup the volumes in in-use and available status
def filter_volume(self, volume_id):
try:
volume = conn.get_volume_by_id(volume_id)
if volume == None: return False
res = volume['status'] in ("available", "in-use")
if not res:
LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
return res
except exceptions.ResourceNotFound:
return False
def remove_volume_backup(self, backup_object):
try:
backup = conn.get_volume_backup(backup_object.backup_id)
if backup == None: return False
if backup["status"] in ("available"):
conn.delete_volume_backup(backup_object.backup_id)
backup_object.delete_backup()
elif backup["status"] in ("error", "error_restoring"):
# TODO(Alex): need to discuss
# now if backup is in error status, then retention service
# does not remove it from openstack but removes it from the
# backup table so user can delete it on Horizon.
backup_object.delete_backup()
else: # "deleting", "restoring"
LOG.info(_("Rotation for the backup %s is skipped in this cycle "
"because it is in %s status") % (backup_object.backup_id, backup["status"]))
except exceptions.ResourceNotFound:
LOG.info(_("Backup %s is not existing in Openstack."
"Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
# remove from the backup table
backup_object.delete_backup()
return False
def check_instance_volumes(self):
"""Get the list of all the volumes from the project using openstacksdk
Function first list all the servers in the project and get the volumes
that are attached to the instance.
"""
queues_map = []
projects = get_projects_list()
for project in projects:
servers = conn.compute.servers(
details=True, all_projects=True, project_id=project.id
)
for server in servers:
if not self.filter_server(server.metadata): continue
for volume in server.attached_volumes:
if not self.filter_volume(volume["id"]): continue
queues_map.append(
QueueMapping(
volume_id=volume["id"],
backup_id="NULL",
instance_id=server.id,
backup_status=constants.BACKUP_PLANNED,
)
)
return queues_map
def _volume_queue(self, task):
"""Saves the queue data to the database."""
# TODO(Alex): Need to escalate discussion
# When create the task list, need to check the WIP backup generators
# which are created in the past backup cycle.
# Then skip to create new tasks for the volumes whose backup is WIP
volume_queue = objects.Queue(self.ctx)
volume_queue.backup_id = task.backup_id
volume_queue.volume_id = task.volume_id
volume_queue.instance_id = task.instance_id
volume_queue.backup_status = task.backup_status
volume_queue.create()
def create_volume_backup(self, queue):
"""Initiate the backup of the volume
:params: queue: Provide the map of the volume that needs
backup.
This function will call the backupup api and change the
backup_status and backup_id in the queue table.
"""
backup_id = queue.backup_id
if backup_id == "NULL":
try:
volume_backup = conn.block_storage.create_backup(
volume_id=queue.volume_id, force=True
)
queue.backup_id = volume_backup.id
queue.backup_status = constants.BACKUP_WIP
queue.save()
except exceptions as error:
print(error)
else:
pass
# TODO(Alex): remove this task from the task list
# Backup planned task cannot have backup_id in the same cycle
# Reserve for now because it is related to the WIP backup genenrators which
# are not finished in the current cycle
def process_failed_backup(self, task):
# 1. TODO(Alex): notify via email
LOG.error("Backup of the volume %s failed." % task.volume_id)
# 2. TODO(Alex): remove failed backup instance from the openstack
# then set the volume status in-use
# 3. remove failed task from the task queue
task.delete_queue()
def process_available_backup(self, task):
LOG.info("Backup of the volume %s is successful." % task.volume_id)
# 1. save success backup in the backup table
self._volume_backup(
BackupMapping(
volume_id=task.volume_id,
backup_id=task.backup_id,
instance_id=task.instance_id,
backup_completed=1,
)
)
# 2. remove from the task list
task.delete_queue()
# 3. TODO(Alex): notify via email
def process_using_backup(self, task):
# remove from the task list
task.delete_queue()
def check_volume_backup_status(self, queue):
"""Checks the backup status of the volume
:params: queue: Provide the map of the volume that needs backup
status checked.
Call the backups api to see if the backup is successful.
"""
for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
if backup_gen.id == queue.backup_id:
if backup_gen.status == "error":
self.process_failed_backup(queue)
elif backup_gen.status == "available":
self.process_available_backup(queue)
elif backup_gen.status == "creating":
# TODO(Alex): Need to escalate discussion
# How to proceed WIP bakcup generators?
# To make things worse, the last backup generator is in progress till
# the new backup cycle
LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
else: # "deleting", "restoring", "error_restoring" status
self.process_using_backup(queue)
def _volume_backup(self, task):
# matching_backups = [
# g for g in self.available_backups if g.backup_id == task.backup_id
# ]
# if not matching_backups:
volume_backup = objects.Volume(self.ctx)
volume_backup.backup_id = task.backup_id
volume_backup.volume_id = task.volume_id
volume_backup.instance_id = task.instance_id
volume_backup.backup_completed = task.backup_completed
volume_backup.create()