Schema and objects for sql queue model
diff --git a/staffeln/common/context.py b/staffeln/common/context.py
new file mode 100644
index 0000000..d916b32
--- /dev/null
+++ b/staffeln/common/context.py
@@ -0,0 +1,19 @@
+from oslo_context import context
+from oslo_log import log
+from oslo_utils import timeutils
+
+LOG = log.getLogger(__name__)
+
+
+class RequestContext(context.RequestContext):
+
+ def __init__(self, backup_id=None, volume_id=None, instance_id=None, executed_at=None, backup_status=None, **kwargs):
+ self.backup_id = backup_id
+ self.volume_id = volume_id
+ self.instance_id = instance_id
+ self.backup_id = backup_id
+ self.executed_at = executed_at
+
+
+def make_context(*args, **kwargs):
+ return RequestContext(*args, **kwargs)
diff --git a/staffeln/common/service.py b/staffeln/common/service.py
index 54b1e62..791955b 100755
--- a/staffeln/common/service.py
+++ b/staffeln/common/service.py
@@ -15,6 +15,7 @@
from oslo_log import log as logging
from staffeln.common import config
+from staffeln import objects
import staffeln.conf
CONF = staffeln.conf.CONF
@@ -26,5 +27,5 @@
logging.register_options(CONF)
config.parse_args(argv)
config.set_config_defaults()
-
+ objects.register_all()
logging.setup(CONF, 'staffeln')
diff --git a/staffeln/db/api.py b/staffeln/db/api.py
index f2b0285..7caa0a4 100644
--- a/staffeln/db/api.py
+++ b/staffeln/db/api.py
@@ -18,6 +18,17 @@
"""Base class for storage system connections."""
@abc.abstractmethod
+ def get_backup_list(self, filters=None):
+ """Get specific columns for matching backup.
+
+ Return a list of the specidied columns for all the backups
+ that match the specified filters.
+
+ :param filters: Filters to apply. Defaults to None.
+ :return: A list of tuples of the specified columns.
+ """
+
+ @abc.abstractmethod
def create_backup(self, values):
"""Create new backup.
@@ -32,3 +43,61 @@
}
:returns: A backup
"""
+
+ @abc.abstractmethod
+ def update_backup(self, backup_uuid, values):
+ """Update properties of the backup.
+ : param backup_uuid: uuid of the backup
+ :param values: A dict containing several items used to add
+ the backup. For example:
+
+ ::
+
+ {
+ 'backup_id': short_id.generate_uuid(),
+ 'backup_status': 'completed',
+ }
+ :returns: A backup
+ """
+
+ @abc.abstractmethod
+ def create_queue(self, values):
+ """Create entry in queue_data.
+ :param values: A dict containing several items used to add
+ the volume information for backup
+
+ ::
+ {
+ 'backup_id': "backup_id"
+ 'volume_id': "volume_id"
+ 'backup_status': 0
+ }
+ :returns A queue
+ """
+
+ @abc.abstractmethod
+ def update_queue(self, backup_id, values):
+ """Update properties of the backup.
+ : param backup_id: uuid of the backup
+ :param values: A dict containing several items used to add
+ the backup. For example:
+
+ ::
+
+ {
+ 'backup_id': short_id.generate_uuid(),
+ 'backup_status': 1
+ }
+ :returns: A backup
+ """
+
+ @abc.abstractmethod
+ def get_queue_list(self, filters=None):
+ """Get specific columns for matching backup.
+
+ Return a list of the specidied columns for all the queues
+ that match the specified filters.
+
+ :param filters: Filters to apply. Defaults to None.
+ :return: A list of tuples of the specified columns.
+ """
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 5921417..1b8b93e 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -5,10 +5,13 @@
import operator
from oslo_config import cfg
+from oslo_log import log
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import timeutils
+from oslo_utils import strutils
+from oslo_utils import uuidutils
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import exc
@@ -18,11 +21,17 @@
from staffeln.db import api
from staffeln.db.sqlalchemy import models
from staffeln.common import short_id
+from staffeln import objects
+
+LOG = log.getLogger(__name__)
CONF = cfg.CONF
_FACADE = None
+is_uuid_like = uuidutils.is_uuid_like
+is_int_like = strutils.is_int_like
+
def _create_facade_lazily():
global _FACADE
@@ -41,37 +50,189 @@
return facade.get_session(**kwargs)
+def get_backend():
+ """The backend is this module itself."""
+ return Connection()
+
+
def model_query(model, *args, **kwargs):
session = kwargs.get('session') or get_session()
query = session.query(model, *args)
return query
+def add_identity_filter(query, value):
+ """Adds an identity filter to a query.
+ Filters results by ID, if supplied value is a valid integer.
+ Otherwise attempts to filter results by UUID.
+ :param query: Initial query to add filter to.
+ :param value: Value for filtering results by.
+ :return: Modified query.
+ """
+ if is_int_like(value):
+ return query.filter_by(id=value)
+ elif is_uuid_like(value):
+ return query.filter_by(backup_id=value)
+ else:
+ raise exception.InvalidIdentity(identity=value)
+
+
+def _paginate_query(model, limit=None, marker=None, sort_key=None,
+ sort_dir=None, query=None):
+ if not query:
+ query = model_query(model)
+ sort_keys = ['id']
+ if sort_key and sort_key not in sort_keys:
+ sort_keys.insert(0, sort_key)
+ query = db_utils.paginate_query(query, model, limit, sort_keys,
+ marker=marker, sort_dir=sort_dir)
+ return query.all()
+
+
class Connection(api.BaseConnection):
"""SQLAlchemy connection."""
+ valid_operators = {
+ "": operator.eq,
+ "eq": operator.eq,
+ "neq": operator.ne,
+ "gt": operator.gt,
+ "gte": operator.ge,
+ "lt": operator.lt,
+ "lte": operator.le,
+ }
+
def __init__(self):
super(Connection, self).__init__()
+ @staticmethod
def _get_relationships(model):
return inspect(model).relationships
+ def _add_backups_filters(self, query, filters):
+ if filters is None:
+ filters = {}
+
+ plain_fields = ['backup_id', 'volume_id', 'instance_id']
+
+ return self._add_filters(
+ query=query, model=models.Backup_data, filters=filters,
+ plain_fields=plain_fields)
+
+ def _add_queues_filters(self, query, filters):
+ if filters is None:
+ filters = {}
+
+ plain_fields = ['backup_id', 'volume_id',
+ 'instance_id', 'executed_at', 'backup_status']
+
+ return self._add_filters(
+ query=query, model=models.Queue_data, filters=filters,
+ plain_fields=plain_fields)
+
+ def _add_filters(self, query, model, filters=None, plain_fields=None):
+ timestamp_mixin_fields = ['created_at', 'updated_at']
+ filters = filters or {}
+
+ for raw_fieldname, value in filters.items():
+ fieldname, operator_ = self.__decompose_filter(raw_fieldname)
+ if fieldname in plain_fields:
+ query = self.__add_simple_filter(
+ query, model, fieldname, value, operator_)
+
+ return query
+
+ def __decompose_filter(self, raw_fieldname):
+ """Decompose a filter name into it's two subparts"""
+
+ seperator = '__'
+ fieldname, seperator, operator_ = raw_fieldname.partition(seperator)
+
+ if operator_ and operator_ not in self.valid_operators:
+ LOG.error('Invalid operator %s' % operator_)
+
+ return fieldname, operator_
+
def _create(self, model, values):
obj = model()
-
cleaned_values = {k: v for k, v in values.items()
if k not in self._get_relationships(model)}
obj.update(cleaned_values)
obj.save()
return obj
+ @ staticmethod
+ def _update(model, id_, values):
+ session = get_session()
+ with session.begin():
+ query = model_query(model, session=session)
+ query = add_identity_filter(query, id_)
+ try:
+ ref = query.with_lockmode('update').one()
+ except exc.NoResultFound:
+ LOG.error("Update backup failed. No result found.")
+
+ def _add_queues_filters(self, query, filters):
+ if filters is None:
+ filters = {}
+
+ plain_fields = ['volume_id', 'backup_status']
+
+ return self._add_filters(
+ query=query, model=models.Queue_data, filters=filters, plain_fields=plain_fields
+ )
+
+ def _get_model_list(self, model, add_filter_func, context, filters=None, limit=None, marker=None, sort_key=None, sort_dir=None, eager=False):
+ query = model_query(model)
+
+ query = add_filter_func(query, filters)
+ return _paginate_query(model, limit, marker,
+ sort_key, sort_dir, query)
+
def create_backup(self, values):
- # ensure uuid are present for new backup
- if not values.get('uuid'):
- values['uuid'] = short_id.generate_id()
+ if not values.get('backup_id'):
+ values['backup_id'] = short_id.generate_id()
try:
backup_data = self._create(models.Backup_data, values)
except db_exc.DBDuplicateEntry:
- pass
- return goal
+ LOG.error("Backup UUID already exists.")
+ return backup_data
+
+ def get_backup_list(self, *args, **kwargs):
+ return self._get_model_list(models.Backup_data,
+ self._add_backup_filters,
+ *args, **kwargs)
+
+ def update_backup(self, backup_id, values):
+ if 'backup_id' in values:
+ LOG.error("Cannot override UUID for existing backup")
+
+ try:
+ return self._update(models.Backup_data, backup_id, values)
+ except:
+ LOG.error("backup resource not found.")
+
+ def create_queue(self, values):
+ if not values.get('backup_id'):
+ values['backup_id'] = short_id.generate_id()
+
+ try:
+ queue_data = self._create(models.Queue_data, values)
+ except db_exc.DBDuplicateEntry:
+ LOG.error("Backup UUID already exists.")
+ return queue_data
+
+ def get_queue_list(self, *args, **kwargs):
+ return self._get_model_list(models.Queue_data,
+ self._add_queues_filters,
+ *args, **kwargs)
+
+ def update_queue(self, backup_id, values):
+ if 'backup_id' in values:
+ LOG.error("Cannot override UUID for existing backup")
+
+ try:
+ return self._update(models.Queue_data, backup_id, values)
+ except:
+ LOG.error("backup resource not found.")
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index df79194..ae3f215 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -12,8 +12,8 @@
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import LargeBinary
-from sqlalchemy import Numeric
from sqlalchemy import orm
+from sqlalchemy import Numeric
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy.types import TypeDecorator, TEXT
@@ -32,7 +32,7 @@
return None
-class StaffelnBase(models.SoftDeleteMixin, models.TimestampMixin, models.ModelBase):
+class StaffelnBase(models.TimestampMixin, models.ModelBase):
metadata = None
def as_dict(self):
@@ -58,9 +58,26 @@
__tablename__ = 'backup_data'
__table_args__ = (
- UniqueConstraint('uuid', name='unique_backup0uuid'),
+ UniqueConstraint('backup_id', name='unique_backup0uuid'),
table_args()
)
id = Column(Integer, primary_key=True, autoincrement=True)
- uuid = Column(String(36))
- volume_name = Column(String(36))
+ backup_id = Column(String(100))
+ volume_id = Column(String(100))
+ instance_id = Column(String(100))
+ backup_completed = Column(Integer())
+
+
+class Queue_data(Base):
+ """Represent the queue of the database"""
+
+ __tablename__ = 'queue_data'
+ __table_args__ = (
+ table_args()
+ )
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ backup_id = Column(String(100))
+ volume_id = Column(String(100))
+ backup_status = Column(Integer())
+ executed_at = Column(DateTime())
+ instance_id = Column(String(100))
diff --git a/staffeln/objects/__init__.py b/staffeln/objects/__init__.py
index e69de29..b9ba344 100755
--- a/staffeln/objects/__init__.py
+++ b/staffeln/objects/__init__.py
@@ -0,0 +1,3 @@
+def register_all():
+ __import__('staffeln.objects.volume')
+ __import__('staffeln.objects.queue')
diff --git a/staffeln/objects/base.py b/staffeln/objects/base.py
index 88b94b7..e5a9f06 100755
--- a/staffeln/objects/base.py
+++ b/staffeln/objects/base.py
@@ -1,13 +1,23 @@
"""Staffeln common internal object model"""
+from oslo_utils import versionutils
from oslo_versionedobjects import base as ovoo_base
-# from oslo_versionedobjects import fields as ovoo_fields
+from oslo_versionedobjects import fields as ovoo_fields
+
+from staffeln import objects
remotable_classmethod = ovoo_base.remotable_classmethod
remotable = ovoo_base.remotable
+def get_attrname(name):
+ """Return the mangled name of the attribute's underlying storage."""
+ # FIXME(danms): This is just until we use o.vo's class properties
+ # and object base.
+ return '_obj_' + name
+
+
class StaffelnObject(ovoo_base.VersionedObject):
"""Base class and object factory.
@@ -29,3 +39,47 @@
class StaffelnObjectSerializer(ovoo_base.VersionedObjectSerializer):
# Base class to use for object hydration
OBJ_BASE_CLASS = StaffelnObject
+
+
+class StaffelnPersistentObject(object):
+ feilds = {
+ 'created_at': ovoo_fields.DateTimeField(nullable=True),
+ 'updated_at': ovoo_fields.DateTimeField(nullable=True),
+ 'deleted_at': ovoo_fields.DateTimeField(nullable=True),
+ }
+
+ object_fields = {}
+
+ def obj_refresh(self, loaded_object):
+ fields = (field for field in self.feilds if field not in self.object_fields)
+ for field in fields:
+ if (self.obj_attr_is_set(field) and self[field] != loaded_object[field]):
+ self[field] = loaded_object[field]
+
+ @staticmethod
+ def _from_db_object(obj, db_object, eager=False):
+ obj_class = type(obj)
+ object_fields = obj_class.object_fields
+
+ for field in obj.fields:
+ if field not in object_fields:
+ obj[field] = db_object[field]
+
+ obj.obj_reset_changes()
+ return obj
+
+
+class StaffelnObjectRegistry(ovoo_base.VersionedObjectRegistry):
+ def registration_hook(self, cls, index):
+ version = versionutils.convert_version_to_tuple(cls.VERSION)
+ if not hasattr(objects, cls.obj_name()):
+ setattr(objects, cls.obj_name(), cls)
+ else:
+ cur_version = versionutils.convert_version_to_tuple(
+ getattr(objects, cls.obj_name()).VERSION)
+ if version >= cur_version:
+ setattr(objects, cls.obj_name(), cls)
+
+
+class StaffelnObjectDictCompat(ovoo_base.VersionedObjectDictCompat):
+ pass
diff --git a/staffeln/objects/fields.py b/staffeln/objects/fields.py
new file mode 100644
index 0000000..bbf13e2
--- /dev/null
+++ b/staffeln/objects/fields.py
@@ -0,0 +1,51 @@
+"""Utility method for objects"""
+
+import ast
+
+from oslo_serialization import jsonutils
+from oslo_versionedobjects import fields
+
+
+BooleanField = fields.BooleanField
+StringField = fields.StringField
+DateTimeField = fields.DateTimeField
+IntegerField = fields.IntegerField
+
+
+class UUIDField(fields.UUIDField):
+ def coerce(self, obj, attr, value):
+ if value is None or value == "":
+ return self._null(obj, attr)
+ else:
+ return self._type.coerce(obj, attr, value)
+
+
+class Numeric(fields.FieldType):
+ @staticmethod
+ def coerce(obj, attr, value):
+ if value is None:
+ return value
+ f_value = float(value)
+ return f_value if not f_value.is_integer() else value
+
+
+class ListOfUUIDsField(fields.AutoTypedField):
+ AUTO_TYPE = fields.List(fields.UUID())
+
+
+class Json(fields.FieldType):
+ def coerce(self, obj, attr, value):
+ if isinstance(value, str):
+ loaded = jsonutils.loads(value)
+ return loaded
+ return value
+
+ def from_primitive(self, obj, attr, value):
+ return self.coerce(obj, attr, value)
+
+ def to_primitive(self, obj, attr, value):
+ return jsonutils.dumps(value)
+
+
+class JsonField(fields.AutoTypedField):
+ AUTO_TYPE = Json()
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
new file mode 100644
index 0000000..0e81064
--- /dev/null
+++ b/staffeln/objects/queue.py
@@ -0,0 +1,44 @@
+from staffeln.common import short_id
+from staffeln.db import api as db_api
+from staffeln.objects import base
+from staffeln.objects import fields as sfeild
+
+
+@base.StaffelnObjectRegistry.register
+class Queue(base.StaffelnPersistentObject, base.StaffelnObject, base.StaffelnObjectDictCompat):
+ VERSION = '1.0'
+
+ dbapi = db_api.get_instance()
+
+ feilds = {
+ 'id': sfeild.IntegerField(),
+ 'backup_id': sfeild.StringField(),
+ 'volume_id': sfeild.UUIDField(),
+ 'instance_id': sfeild.StringField(),
+ 'backup_status': sfeild.IntegerField(),
+ 'executed_at': sfeild.DateTimeField()
+ }
+
+ @base.remotable_classmethod
+ def list(cls, context, filters=None):
+ db_queue = cls.dbapi.get_queue_list(context, filters=filters)
+ return [cls._from_db_object(cls(context), obj) for obj in db_queue]
+
+ @base.remotable
+ def create(self):
+ values = self.obj_get_changes()
+ db_queue = self.dbapi.create_queue(values)
+ self._from_db_object(self, db_queue)
+
+ @base.remotable
+ def save(self):
+ updates = self.obj_get_changes()
+ db_obj = self.dbapi.update_queue(self.backup_id, updates)
+ obj = self._from_db_object(self, db_obj, eager=False)
+ self.obj_refresh(obj)
+ self.obj_reset_changes()
+
+ @base.remotable
+ def refresh(self):
+ current = self.get_by_uuid(uuid=self.uuid)
+ self.obj_refresh(current)
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
new file mode 100644
index 0000000..80a9663
--- /dev/null
+++ b/staffeln/objects/volume.py
@@ -0,0 +1,59 @@
+from staffeln.common import short_id
+from staffeln.db import api as db_api
+from staffeln.objects import base
+from staffeln.objects import fields as sfeild
+
+
+@base.StaffelnObjectRegistry.register
+class Volume(base.StaffelnPersistentObject, base.StaffelnObject, base.StaffelnObjectDictCompat):
+ VERSION = '1.0'
+
+ dbapi = db_api.get_instance()
+
+ fields = {
+ 'id': sfeild.IntegerField(),
+ 'backup_id': sfeild.StringField(),
+ 'instance_id': sfeild.StringField(),
+ 'volume_id': sfeild.UUIDField()
+ }
+
+ @base.remotable
+ def list(cls, filters=None):
+ """Return a list of :class:`Backup` objects.
+
+ :param filters: dict mapping the filter to a value.
+ """
+ db_backups = cls.dbapi.get_backup_list(filters=filters)
+
+ return [cls._from_db_object(cls, obj) for obj in db_backups]
+
+ @base.remotable
+ def create(self):
+ """Create a :class:`Backup_data` record in the DB"""
+ values = self.obj_get_changes()
+ print(values)
+ db_backup = self.dbapi.create_backup(values)
+ self._from_db_object(self, db_backup)
+
+ @base.remotable
+ def save(self):
+ """Save updates to the :class:`Backup_data`.
+
+ Updates will be made column by column based on the results
+ of self.what_changed().
+ """
+ updates = self.obj_get_changes()
+ db_obj = self.dbapi.update_backup(self.uuid, updates)
+ obj = self._from_db_object(self, db_obj, eager=False)
+ self.obj_refresh(obj)
+ self.obj_reset_changes()
+
+ @base.remotable
+ def refresh(self):
+ """Loads updates for this :class:`Backup_data`.
+ Loads a backup with the same backup_id from the database and
+ checks for updated attributes. Updates are applied from
+ the loaded backup column by column, if there are any updates.
+ """
+ current = self.get_by_uuid(uuid=self.uuid)
+ self.obj_refresh(current)