Volume backup initiate
diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index cbe01a0..d3d0412 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -15,8 +15,7 @@
service.prepare_service()
sm = cotyledon.ServiceManager()
- sm.add(manager.BackupManager,
- workers=CONF.conductor.backup_workers, args=(CONF,))
+ sm.add(manager.BackupManager, workers=CONF.conductor.backup_workers, args=(CONF,))
# sm.add(manager.RotationManager,
# workers=CONF.conductor.rotation_workers, args=(CONF,))
oslo_config_glue.setup(sm, CONF)
diff --git a/staffeln/common/auth.py b/staffeln/common/auth.py
index 5177098..e23ef71 100755
--- a/staffeln/common/auth.py
+++ b/staffeln/common/auth.py
@@ -2,4 +2,4 @@
def create_connection():
- return openstack.connect(cloud="envvars")
\ No newline at end of file
+ return openstack.connect(cloud="envvars")
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 18e5347..7a83df9 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,6 +1,7 @@
import staffeln.conf
import collections
+from openstack.block_storage.v2 import backup
from oslo_log import log
from staffeln.common import auth
from staffeln.common import context
@@ -38,7 +39,7 @@
return projects
-class Queue(object):
+class Backup(object):
"""Implmentations of the queue with the sql."""
def __init__(self):
@@ -100,9 +101,9 @@
for volume in volumes:
queues_map["queues"] = QueueMapping(
volume_id=volume["id"],
- backup_id=short_id.generate_id(),
+ backup_id="NULL",
instance_id=server_id,
- backup_status=1,
+ backup_status=0,
)
return discovered_map
@@ -124,54 +125,40 @@
volume_queue.backup_status = backup_status
volume_queue.create()
+ def volume_backup_initiate(self, queue):
+ """Initiate the backup of the volume
+ :params: queue: Provide the map of the volume that needs
+ backup.
+ This function will call the backupup api and change the
+ backup_status and backup_id in the queue table.
+ """
+ volume_info = conn.get_volume(queue.volume_id)
+ backup_id = queue.backup_id
+ if backup_id == "NULL":
+ volume_backup = conn.block_storage.create_backup(
+ volume_id=queue.volume_id, force=True
+ )
+ update_queue = objects.Queue.get_by_id(self.ctx, queue.id)
+ update_queue.backup_id = volume_backup.id
+ update_queue.backup_status = 1
+ update_queue.save()
-class Backup_data(object):
- """Implementation for volumes backup"""
-
- def __init__(self):
- self.ctx = context.make_context()
- self.discovered_map = None
- self.backup_mapping = dict()
- self._available_backups = None
- self._available_backups_map = None
-
- @property
- def available_backups(self):
- """Backups loaded from DB"""
- if self._available_backups is None:
- self._available_backups = objects.Volume.list(self.ctx)
- return self._available_backups
-
- @property
- def available_backups_map(self):
- """Mapping of backup loaded from DB"""
- if self._available_backups_map is None:
- self._available_backups_map = {
- BackupMapping(
- backup_id=g.backup_id,
- volume_id=g.volume_id,
- instance_id=g.instance_id,
- backup_completed=g.backup_completed,
- ): g
- for g in self.available_backups
- }
- return self._available_backups_map
-
- def volume_backup(self, queue):
- pass
-
- def _volume_backup(self, backup_map):
- """Saves the backup data to database."""
- volume_id = backup_map.volume_id
- backup_id = backup_map.backup_id
- instance_id = backup_map.instance_id
- backup_mapping = dict()
- matching_backups = [
- g for g in self.available_backups if g.backup_id == backup_id
- ]
- if not matching_backups:
- volume = objects.Volume(self.ctx)
- volume.backup_id = backup_id
- volume.volume_id = volume_id
- volume.instance_id = instance_id
- volume.create()
+ def check_volume_backup_status(self, queue):
+ """Checks the backup status of the volume
+ :params: queue: Provide the map of the volume that needs backup
+ status checked.
+ Call the backups api to see if the backup is successful.
+ """
+ for raw in conn.block_storage.backups(volume_id=queue.volume_id):
+ backup_info = raw.to_dict()
+ if backup_info.id == queue.id:
+ if backup_info.status == error:
+ Log.error("Backup of the volume %s failed." % queue.id)
+ ## Call db api to remove the queue object.
+ elif backup_info.status == "success":
+ LOG.info("Backup of the volume %s is successful." % queue.volume_id)
+ ## call db api to remove the queue object.
+ ## Call db api to add the backup status in volume_backup table.
+ else:
+ pass
+ ## Wait for the backup to be completed.
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 6f24cce..8bf5ca0 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -31,8 +31,7 @@
(self.backup_engine, (), {}),
]
periodic_worker = periodics.PeriodicWorker(periodic_callables)
- periodic_thread = threading.Thread(
- target=periodic_worker.start)
+ periodic_thread = threading.Thread(target=periodic_worker.start)
periodic_thread.daemon = True
periodic_thread.start()
@@ -47,27 +46,26 @@
def backup_engine(self):
LOG.info("backing... %s" % str(time.time()))
LOG.info("%s periodics" % self.name)
- queue = backup.Queue().get_queues()
- queues_to_start = backup.Queue().get_queues(
- filters={'backup_status': constants.BACKUP_PLANNED})
- print(queues_to_start)
- queues_started = backup.Queue().get_queues(
- filters={'backup_status': constants.BACKUP_WIP})
- print(queues_started)
- queue_completed = backup.Queue().get_queues(
- filters={'backup_status': constants.BACKUP_COMPLETED})
- print(queue_completed)
+ queue = backup.Backup().get_queues()
+ queues_to_start = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_PLANNED}
+ )
+ queues_started = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_WIP}
+ )
+ queue_completed = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_COMPLETED}
+ )
if len(queue) == 0:
- create_queue = backup.Queue().create_queue()
+ create_queue = backup.Backup().create_queue()
elif len(queues_started) != 0:
for queue in queues_started:
- LOG.info("Waiting for backup of %s to be completed" %
- queue.volume_id)
- backup_volume = backup.Backup_data().volume_backup(queue)
+ LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+ backup_volume = backup.Backup().check_volume_backup_status(queue)
elif len(queues_to_start) != 0:
for queue in queues_to_start:
LOG.info("Started backup process for %s" % queue.volume_id)
- backup_volume = backup.Backup_data().volume_backup(queue)
+ backup_volume = backup.Backup().volume_backup_initiate(queue)
elif len(queue_completed) == len(queue):
pass
@@ -89,8 +87,7 @@
(self.rotation_engine, (), {}),
]
periodic_worker = periodics.PeriodicWorker(periodic_callables)
- periodic_thread = threading.Thread(
- target=periodic_worker.start)
+ periodic_thread = threading.Thread(target=periodic_worker.start)
periodic_thread.daemon = True
periodic_thread.start()
diff --git a/staffeln/conf/__init__.py b/staffeln/conf/__init__.py
index 53cd2be..8f82b29 100755
--- a/staffeln/conf/__init__.py
+++ b/staffeln/conf/__init__.py
@@ -3,11 +3,11 @@
from staffeln.conf import api
from staffeln.conf import conductor
from staffeln.conf import database
-# from staffeln.conf import paths
+from staffeln.conf import paths
CONF = cfg.CONF
api.register_opts(CONF)
conductor.register_opts(CONF)
database.register_opts(CONF)
-# paths.register_opts(CONF)
+paths.register_opts(CONF)
diff --git a/staffeln/conf/database.py b/staffeln/conf/database.py
index eddc9d1..f823eae 100644
--- a/staffeln/conf/database.py
+++ b/staffeln/conf/database.py
@@ -1,7 +1,10 @@
from oslo_config import cfg
from oslo_db import options as oslo_db_options
+from staffeln.conf import paths
-_DEFAULT_SQL_CONNECTION = "mysql+pymysql://admin:123123@192.168.2.115:3306/staffeln"
+_DEFAULT_SQL_CONNECTION = "sqlite:///{0}".format(
+ paths.state_path_def("staffeln.sqlite")
+)
database = cfg.OptGroup(
"database",
diff --git a/staffeln/conf/paths.py b/staffeln/conf/paths.py
new file mode 100644
index 0000000..bb2420a
--- /dev/null
+++ b/staffeln/conf/paths.py
@@ -0,0 +1,24 @@
+from oslo_config import cfg
+
+import os
+
+PATH_OPTS = [
+ cfg.StrOpt(
+ "state_path",
+ default="$pybasedir",
+ help="Top-level directory for maintaining staffeln's state.",
+ ),
+]
+
+
+def state_path_def(*args):
+ """Return an uninterpolated path relative to $state_path."""
+ return os.path.join("$state_path", *args)
+
+
+def register_opts(conf):
+ conf.register_opts(PATH_OPTS)
+
+
+def list_opts():
+ return [("DEFAULT", PATH_OPTS)]
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 7d3cd0b..400fc7a 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -66,8 +66,6 @@
"""
if is_int_like(value):
return query.filter_by(id=value)
- elif is_uuid_like(value):
- return query.filter_by(backup_id=value)
else:
LOG.error("Invalid Identity")
@@ -177,12 +175,10 @@
return fieldname, operator_
- def _get(self, context, model, fieldname, value, eager):
+ def _get(self, context, model, fieldname, value):
query = model_query(model)
query = query.filter(getattr(model, fieldname) == value)
- # if not context.show_deleted:
- # query = query.filter(model.deleted_at.is_(None))
try:
obj = query.one()
@@ -196,7 +192,6 @@
cleaned_values = {
k: v for k, v in values.items() if k not in self._get_relationships(model)
}
- print(cleaned_values)
obj.update(cleaned_values)
obj.save()
return obj
@@ -208,9 +203,11 @@
query = model_query(model, session=session)
query = add_identity_filter(query, id_)
try:
- ref = query.with_lockmode("update").one()
+ ref = query.with_for_update().one()
except exc.NoResultFound:
LOG.error("Update backup failed. No result found.")
+ ref.update(values)
+ return ref
def _get_model_list(
self,
@@ -267,22 +264,22 @@
models.Queue_data, self._add_queues_filters, *args, **kwargs
)
- def update_queue(self, backup_id, values):
- if "backup_id" in values:
- LOG.error("Cannot override backup_id for existing backup queue.")
-
+ def update_queue(self, id, values):
+ print(self._update(models.Queue_data, id, values))
try:
- return self._update(models.Queue_data, backup_id, values)
+ return self._update(models.Queue_data, id, values)
except:
- LOG.error("backup resource not found.")
+ LOG.error("Queue resource not found.")
- def get_queue_by_backup_id(self, context, backup_id):
+ def get_queue_by_id(self, context, id):
"""Get the column from queue_data with matching backup_id"""
- return self._get_queue(context, fieldname="backup_id", value=backup_id)
+ return self._get_queue(context, fieldname="id", value=id)
def _get_queue(self, context, fieldname, value):
"""Get the columns from queue_data table"""
+
try:
+
return self._get(
context, model=models.Queue_data, fieldname=fieldname, value=value
)
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index a010e27..ac41bab 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -26,7 +26,7 @@
return [cls._from_db_object(cls(context), obj) for obj in db_queue]
@base.remotable_classmethod
- def get_by_backup_id(cls, context, backup_id):
+ def get_by_id(cls, context, id):
"""Find a backup based on backup_id
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
@@ -37,7 +37,7 @@
:param backup_id: the backup id of volume in queue.
:returns: a :class:`Queue` object.
"""
- db_queue = cls.dbapi.get_queue_by_backup_id(context, backup_id)
+ db_queue = cls.dbapi.get_queue_by_id(context, id)
queue = cls._from_db_object(cls(context), db_queue)
return queue
@@ -51,7 +51,7 @@
@base.remotable
def save(self):
updates = self.obj_get_changes()
- db_obj = self.dbapi.update_queue(self.backup_id, updates)
+ db_obj = self.dbapi.update_queue(self.id, updates)
obj = self._from_db_object(self, db_obj, eager=False)
self.obj_refresh(obj)
self.obj_reset_changes()
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index c2652d7..498d65a 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -20,14 +20,14 @@
}
@base.remotable_classmethod
- def list(cls, context, filters=None):
+ def list(cls, filters=None):
"""Return a list of :class:`Backup` objects.
:param filters: dict mapping the filter to a value.
"""
- db_backups = cls.dbapi.get_backup_list(context, filters=filters)
+ db_backups = cls.dbapi.get_backup_list(filters=filters)
- return [cls._from_db_object(cls(context), obj) for obj in db_backups]
+ return [cls._from_db_object(cls(), obj) for obj in db_backups]
@base.remotable
def create(self):