Schema and backup conductor
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index ea3059a..ea3c20a 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,7 +1,26 @@
import staffeln.conf
+import collections
+from oslo_log import log
+from staffeln.common import auth
+from staffeln.common import context
+# from staffeln.objects import backup as backup_api
+from staffeln import objects
+from staffeln.common import short_id
CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+
+BackupMapping = collections.namedtuple(
+ 'BackupMapping', ['volume_id', 'backup_id', 'instance_id', 'backup_completed'])
+
+QueueMapping = collections.namedtuple(
+ 'QueueMapping', ['volume_id', 'backup_id',
+ 'instance_id', 'backup_status']
+)
+
+conn = auth.create_connection()
def check_vm_backup_metadata(metadata):
@@ -9,6 +28,142 @@
return False
return metadata[CONF.conductor.backup_metadata_key].lower() in ['true']
+
def backup_volumes_in_project(conn, project_name):
# conn.list_servers()
pass
+
+
+def get_projects_list():
+ projects = conn.list_projects()
+ return(projects)
+
+
+class Queue(object):
+ def __init__(self):
+ self.ctx = context.make_context()
+ self.discovered_map = None
+ self.queue_mapping = dict()
+ self._available_queues = None
+ self._available_queues_map = None
+
+ @property
+ def available_queues(self):
+ """Queues loaded from DB"""
+ if self._available_queues is None:
+ self._available_queues = objects.Queue.list(
+ self.ctx)
+ return self._available_queues
+
+ @property
+ def available_queues_map(self):
+ """Mapping of backup loaded from DB"""
+ if self._available_queues_map is None:
+ self._available_queues_map = {
+ QueueMapping(
+ backup_id=g.backup_id,
+ volume_id=g.volume_id,
+ instance_id=g.instance_id,
+ backup_status=g.backup_status): g
+ for g in self.available_queues
+ }
+ return self._available_queues_map
+
+ def get_queues(self, filters=None):
+ queues = objects.Queue.list(self.ctx, filters=filters)
+ return queues
+
+ def create_queue(self):
+ self.discovered_map = self.check_instance_volumes()
+ queues_map = self.discovered_map["queues"]
+ for queue_name, queue_map in queues_map.items():
+ self._volume_queue(queue_map)
+
+ def check_instance_volumes(self):
+ queues_map = {}
+ discovered_map = {
+ "queues": queues_map
+ }
+ projects = get_projects_list()
+ for project in projects:
+ servers = conn.compute.servers(
+ details=True, all_projects=True, project_id=project.id)
+ for server in servers:
+ server_id = server.host_id
+ volumes = server.attached_volumes
+ for volume in volumes:
+ queues_map['queues'] = QueueMapping(
+ volume_id=volume['id'],
+ backup_id=short_id.generate_id(),
+ instance_id=server_id,
+ backup_status=1
+ )
+ return discovered_map
+
+ def _volume_queue(self, queue_map):
+ # print(queue_map)
+ volume_id = queue_map.volume_id
+ backup_id = queue_map.backup_id
+ instance_id = queue_map.instance_id
+ backup_status = queue_map.backup_status
+ backup_mapping = dict()
+ matching_backups = [g for g in self.available_queues
+ if g.backup_id == backup_id]
+ if not matching_backups:
+ volume_queue = objects.Queue(self.ctx)
+ volume_queue.backup_id = backup_id
+ volume_queue.volume_id = volume_id
+ volume_queue.instance_id = instance_id
+ volume_queue.backup_status = backup_status
+ volume_queue.create()
+
+
+class Backup_data(object):
+
+ def __init__(self):
+ self.ctx = context.make_context()
+ self.discovered_map = None
+ self.backup_mapping = dict()
+ self._available_backups = None
+ self._available_backups_map = None
+
+ @property
+ def available_backups(self):
+ """Backups loaded from DB"""
+ if self._available_backups is None:
+ self._available_backups = objects.Volume.list(self.ctx)
+ return self._available_backups
+
+ @property
+ def available_backups_map(self):
+ """Mapping of backup loaded from DB"""
+ if self._available_backups_map is None:
+ self._available_backups_map = {
+ BackupMapping(
+ backup_id=g.backup_id,
+ volume_id=g.volume_id,
+ instance_id=g.instance_id,
+ backup_completed=g.backup_completed): g
+ for g in self.available_backups
+ }
+ return self._available_backups_map
+
+ def volume_backup(self, queue):
+ pass
+
+ def _volume_backup(self, backup_map):
+ volume_id = backup_map.volume_id
+ backup_id = backup_map.backup_id
+ instance_id = backup_map.instance_id
+ backup_mapping = dict()
+ for g in self.available_backups:
+ print(g)
+ print(g.volume_id)
+ matching_backups = [g for g in self.available_backups
+ if g.backup_id == backup_id]
+ if not matching_backups:
+ volume = objects.Volume(self.ctx)
+ volume.backup_id = backup_id
+ volume.volume_id = volume_id
+ volume.instance_id = instance_id
+ volume.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 85b8f78..169230c 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -8,6 +8,7 @@
from staffeln.common import auth
from staffeln.conductor import backup
+from staffeln.common import context
LOG = log.getLogger(__name__)
@@ -21,6 +22,7 @@
super(BackupManager, self).__init__(worker_id)
self._shutdown = threading.Event()
self.conf = conf
+ self.ctx = context.make_context()
LOG.info("%s init" % self.name)
def run(self):
@@ -43,24 +45,28 @@
@periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
def backup_engine(self):
- print("backing... %s" % str(time.time()))
+ LOG.info("backing... %s" % str(time.time()))
LOG.info("%s periodics" % self.name)
- conn = auth.create_connection()
- projects = conn.list_projects()
- for project in projects:
- print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<Project>>>>>>>>>>>>>>>>>>>>>>>>>")
- print(project.id)
- servers = conn.list_servers(all_projects=True, filters={
- "project_id": project.id})
- for server in servers:
- if not backup.check_vm_backup_metadata(server.metadata):
- continue
- for volume in server.volumes:
- print("<<<<<<<<<<<Volume>>>>>>>>>>")
- print(volume)
- # 1 backup volume
- conn.create_volume_backup(volume_id=volume.id, force=True)
- # 2 store backup_id in the database
+ queue = backup.Queue().get_queues()
+ queues_to_start = backup.Queue().get_queues(
+ filters={'backup_status': 0})
+ queues_started = backup.Queue().get_queues(
+ filters={'backup_status': 1})
+ queue_completed = backup.Queue().get_queues(
+ filters={'backup_status': 2})
+ if len(queue) == 0:
+ create_queue = backup.Queue().create_queue()
+ elif len(queues_started) != 0:
+ for queue in queues_started:
+ LOG.info("Waiting for backup of %s to be completed" %
+ queue.volume_id)
+ backup_volume = backup.Backup_data().volume_backup(queue)
+ elif len(queues_to_start) != 0:
+ for queue in queues_to_start:
+ LOG.info("Started backup process for %s" % queue.volume_id)
+ backup_volume = backup.Backup_data().volume_backup(queue)
+ elif len(queue_completed) == len(queue):
+ pass
class RotationManager(cotyledon.Service):
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 1b8b93e..52d9853 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -74,7 +74,7 @@
elif is_uuid_like(value):
return query.filter_by(backup_id=value)
else:
- raise exception.InvalidIdentity(identity=value)
+ LOG.error("Invalid Identity")
def _paginate_query(model, limit=None, marker=None, sort_key=None,
@@ -109,15 +109,16 @@
def _get_relationships(model):
return inspect(model).relationships
- def _add_backups_filters(self, query, filters):
+ def _add_backup_filters(self, query, filters):
if filters is None:
filters = {}
- plain_fields = ['backup_id', 'volume_id', 'instance_id']
+ plain_fields = ['volume_id', 'backup_id',
+ 'backup_completed', 'instance_id']
return self._add_filters(
- query=query, model=models.Backup_data, filters=filters,
- plain_fields=plain_fields)
+ query=query, model=models.Backup_data, filters=filters, plain_fields=plain_fields
+ )
def _add_queues_filters(self, query, filters):
if filters is None:
@@ -142,6 +143,16 @@
return query
+ def __add_simple_filter(self, query, model, fieldname, value, operator_):
+ field = getattr(model, fieldname)
+
+ if (fieldname != 'deleted' and value and
+ field.type.python_type is datetime.datetime):
+ if not isinstance(value, datetime.datetime):
+ value = timeutils.parse_isotime(value)
+
+ return query.filter(self.valid_operators[operator_](field, value))
+
def __decompose_filter(self, raw_fieldname):
"""Decompose a filter name into it's two subparts"""
@@ -153,10 +164,25 @@
return fieldname, operator_
+ def _get(self, context, model, fieldname, value, eager):
+ query = model_query(model)
+
+ query = query.filter(getattr(model, fieldname) == value)
+ if not context.show_deleted:
+ query = query.filter(model.deleted_at.is_(None))
+
+ try:
+ obj = query.one()
+ except exc.NoResultFound:
+ LOG.error("ResourceNotFound")
+
+ return obj
+
def _create(self, model, values):
obj = model()
cleaned_values = {k: v for k, v in values.items()
if k not in self._get_relationships(model)}
+ print(cleaned_values)
obj.update(cleaned_values)
obj.save()
return obj
@@ -172,16 +198,6 @@
except exc.NoResultFound:
LOG.error("Update backup failed. No result found.")
- def _add_queues_filters(self, query, filters):
- if filters is None:
- filters = {}
-
- plain_fields = ['volume_id', 'backup_status']
-
- return self._add_filters(
- query=query, model=models.Queue_data, filters=filters, plain_fields=plain_fields
- )
-
def _get_model_list(self, model, add_filter_func, context, filters=None, limit=None, marker=None, sort_key=None, sort_dir=None, eager=False):
query = model_query(model)
@@ -236,3 +252,14 @@
return self._update(models.Queue_data, backup_id, values)
except:
LOG.error("backup resource not found.")
+
+ def get_queue_by_uuid(self, context, backup_id):
+ return self._get_queue(
+ context, fieldname="uuid", value=backup_id)
+
+ def _get_queue(self, context, fieldname, value):
+ try:
+ return self._get(context, model=models.Queue_data,
+ fieldname=fieldname, value=value)
+ except:
+ LOG.error("Queue not found")
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index ae3f215..9c78c43 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -79,5 +79,4 @@
backup_id = Column(String(100))
volume_id = Column(String(100))
backup_status = Column(Integer())
- executed_at = Column(DateTime())
instance_id = Column(String(100))
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index 0e81064..b6f9b76 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -10,13 +10,12 @@
dbapi = db_api.get_instance()
- feilds = {
+ fields = {
'id': sfeild.IntegerField(),
'backup_id': sfeild.StringField(),
'volume_id': sfeild.UUIDField(),
'instance_id': sfeild.StringField(),
- 'backup_status': sfeild.IntegerField(),
- 'executed_at': sfeild.DateTimeField()
+ 'backup_status': sfeild.IntegerField()
}
@base.remotable_classmethod
@@ -24,6 +23,22 @@
db_queue = cls.dbapi.get_queue_list(context, filters=filters)
return [cls._from_db_object(cls(context), obj) for obj in db_queue]
+ @base.remotable_classmethod
+ def get_by_backup_id(cls, context, backup_id):
+ """Find a backup based on backup_id
+ :param context: Security context. NOTE: This should only
+ be used internally by the indirection_api.
+ Unfortunately, RPC requires context as the first
+ argument, even though we don't use it.
+ A context should be set when instantiating the
+ object, e.g.: Queue(context)
+ :param backup_id: the backup id of volume in queue.
+ :returns: a :class:`Queue` object.
+ """
+ db_queue = cls.dbapi.get_queue_by_backup_id(context, backup_id)
+ queue = cls._from_db_object(cls(context), db_queue)
+ return queue
+
@base.remotable
def create(self):
values = self.obj_get_changes()
@@ -40,5 +55,5 @@
@base.remotable
def refresh(self):
- current = self.get_by_uuid(uuid=self.uuid)
+ current = self.get_by_backup_id(backup_id=self.backup_id)
self.obj_refresh(current)
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 80a9663..8952164 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -14,18 +14,19 @@
'id': sfeild.IntegerField(),
'backup_id': sfeild.StringField(),
'instance_id': sfeild.StringField(),
- 'volume_id': sfeild.UUIDField()
+ 'volume_id': sfeild.UUIDField(),
+ 'backup_completed': sfeild.IntegerField()
}
- @base.remotable
- def list(cls, filters=None):
+ @base.remotable_classmethod
+ def list(cls, context, filters=None):
"""Return a list of :class:`Backup` objects.
:param filters: dict mapping the filter to a value.
"""
- db_backups = cls.dbapi.get_backup_list(filters=filters)
+ db_backups = cls.dbapi.get_backup_list(context, filters=filters)
- return [cls._from_db_object(cls, obj) for obj in db_backups]
+ return [cls._from_db_object(cls(context), obj) for obj in db_backups]
@base.remotable
def create(self):
@@ -55,5 +56,5 @@
checks for updated attributes. Updates are applied from
the loaded backup column by column, if there are any updates.
"""
- current = self.get_by_uuid(uuid=self.uuid)
+ current = self.get_by_uuid(backup_id=self.backup_id)
self.obj_refresh(current)