blob: 1b8b93eeb02ad4679318d5a9ba1b4e3b9b1c06d4 [file] [log] [blame]
"""SQLAlchemy storage backend."""
import collections
import datetime
import operator
from oslo_config import cfg
from oslo_log import log
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import utils as db_utils
from oslo_utils import timeutils
from oslo_utils import strutils
from oslo_utils import uuidutils
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import exc
from staffeln.i18n import _
from staffeln.common import config
from staffeln.db import api
from staffeln.db.sqlalchemy import models
from staffeln.common import short_id
from staffeln import objects
LOG = log.getLogger(__name__)
CONF = cfg.CONF
_FACADE = None
is_uuid_like = uuidutils.is_uuid_like
is_int_like = strutils.is_int_like
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
_FACADE = db_session.EngineFacade.from_config(CONF)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def get_backend():
"""The backend is this module itself."""
return Connection()
def model_query(model, *args, **kwargs):
session = kwargs.get('session') or get_session()
query = session.query(model, *args)
return query
def add_identity_filter(query, value):
"""Adds an identity filter to a query.
Filters results by ID, if supplied value is a valid integer.
Otherwise attempts to filter results by UUID.
:param query: Initial query to add filter to.
:param value: Value for filtering results by.
:return: Modified query.
"""
if is_int_like(value):
return query.filter_by(id=value)
elif is_uuid_like(value):
return query.filter_by(backup_id=value)
else:
raise exception.InvalidIdentity(identity=value)
def _paginate_query(model, limit=None, marker=None, sort_key=None,
sort_dir=None, query=None):
if not query:
query = model_query(model)
sort_keys = ['id']
if sort_key and sort_key not in sort_keys:
sort_keys.insert(0, sort_key)
query = db_utils.paginate_query(query, model, limit, sort_keys,
marker=marker, sort_dir=sort_dir)
return query.all()
class Connection(api.BaseConnection):
"""SQLAlchemy connection."""
valid_operators = {
"": operator.eq,
"eq": operator.eq,
"neq": operator.ne,
"gt": operator.gt,
"gte": operator.ge,
"lt": operator.lt,
"lte": operator.le,
}
def __init__(self):
super(Connection, self).__init__()
@staticmethod
def _get_relationships(model):
return inspect(model).relationships
def _add_backups_filters(self, query, filters):
if filters is None:
filters = {}
plain_fields = ['backup_id', 'volume_id', 'instance_id']
return self._add_filters(
query=query, model=models.Backup_data, filters=filters,
plain_fields=plain_fields)
def _add_queues_filters(self, query, filters):
if filters is None:
filters = {}
plain_fields = ['backup_id', 'volume_id',
'instance_id', 'executed_at', 'backup_status']
return self._add_filters(
query=query, model=models.Queue_data, filters=filters,
plain_fields=plain_fields)
def _add_filters(self, query, model, filters=None, plain_fields=None):
timestamp_mixin_fields = ['created_at', 'updated_at']
filters = filters or {}
for raw_fieldname, value in filters.items():
fieldname, operator_ = self.__decompose_filter(raw_fieldname)
if fieldname in plain_fields:
query = self.__add_simple_filter(
query, model, fieldname, value, operator_)
return query
def __decompose_filter(self, raw_fieldname):
"""Decompose a filter name into it's two subparts"""
seperator = '__'
fieldname, seperator, operator_ = raw_fieldname.partition(seperator)
if operator_ and operator_ not in self.valid_operators:
LOG.error('Invalid operator %s' % operator_)
return fieldname, operator_
def _create(self, model, values):
obj = model()
cleaned_values = {k: v for k, v in values.items()
if k not in self._get_relationships(model)}
obj.update(cleaned_values)
obj.save()
return obj
@ staticmethod
def _update(model, id_, values):
session = get_session()
with session.begin():
query = model_query(model, session=session)
query = add_identity_filter(query, id_)
try:
ref = query.with_lockmode('update').one()
except exc.NoResultFound:
LOG.error("Update backup failed. No result found.")
def _add_queues_filters(self, query, filters):
if filters is None:
filters = {}
plain_fields = ['volume_id', 'backup_status']
return self._add_filters(
query=query, model=models.Queue_data, filters=filters, plain_fields=plain_fields
)
def _get_model_list(self, model, add_filter_func, context, filters=None, limit=None, marker=None, sort_key=None, sort_dir=None, eager=False):
query = model_query(model)
query = add_filter_func(query, filters)
return _paginate_query(model, limit, marker,
sort_key, sort_dir, query)
def create_backup(self, values):
if not values.get('backup_id'):
values['backup_id'] = short_id.generate_id()
try:
backup_data = self._create(models.Backup_data, values)
except db_exc.DBDuplicateEntry:
LOG.error("Backup UUID already exists.")
return backup_data
def get_backup_list(self, *args, **kwargs):
return self._get_model_list(models.Backup_data,
self._add_backup_filters,
*args, **kwargs)
def update_backup(self, backup_id, values):
if 'backup_id' in values:
LOG.error("Cannot override UUID for existing backup")
try:
return self._update(models.Backup_data, backup_id, values)
except:
LOG.error("backup resource not found.")
def create_queue(self, values):
if not values.get('backup_id'):
values['backup_id'] = short_id.generate_id()
try:
queue_data = self._create(models.Queue_data, values)
except db_exc.DBDuplicateEntry:
LOG.error("Backup UUID already exists.")
return queue_data
def get_queue_list(self, *args, **kwargs):
return self._get_model_list(models.Queue_data,
self._add_queues_filters,
*args, **kwargs)
def update_queue(self, backup_id, values):
if 'backup_id' in values:
LOG.error("Cannot override UUID for existing backup")
try:
return self._update(models.Queue_data, backup_id, values)
except:
LOG.error("backup resource not found.")