Add tooz file driver support
diff --git a/hack/stack.sh b/hack/stack.sh
index 6830d57..497b29a 100755
--- a/hack/stack.sh
+++ b/hack/stack.sh
@@ -18,6 +18,7 @@
fi
# Create DevStack configuration file
+
sudo mkdir /etc/staffeln
sudo chown -R "${USER}". /etc/staffeln
cat <<EOF > /opt/stack/local.conf
@@ -65,8 +66,10 @@
[database]
backend = sqlalchemy
connection = "mysql+pymysql://staffeln:password@localhost:3306/staffeln"
-tooz_connection = "mysql://staffeln:password@localhost:3306/staffeln"
mysql_engine = InnoDB
+
+[coordination]
+backend_url = "file:///tmp/staffeln_locks"
EOF
# Create staffeln database
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
index 159f60a..8006fd1 100644
--- a/staffeln/common/lock.py
+++ b/staffeln/common/lock.py
@@ -1,18 +1,27 @@
-import staffeln.conf
+import errno
+import glob
+import os
+import re
+import sys
+from typing import Optional # noqa: H301
+import uuid
+
+from oslo_config import cfg
from oslo_log import log
+from oslo_utils import timeutils
from oslo_utils import uuidutils
from tooz import coordination
-CONF = staffeln.conf.CONF
+from staffeln import conf
+from staffeln import exception
+
+CONF = conf.CONF
LOG = log.getLogger(__name__)
class LockManager(object):
- def __init__(self, node_id=None):
- self.db_url = CONF.database.tooz_connection
- self.node_id = uuidutils.generate_uuid() if node_id is None else node_id
- # get_coordinator(backend_url, member_id)
- self.coordinator = coordination.get_coordinator(self.db_url, node_id)
+ def __init__(self):
+ self.coordinator = COORDINATOR
def __enter__(self):
self.coordinator.start()
@@ -23,19 +32,110 @@
class Lock(object):
- def __init__(self, lock_manager, lock_name):
+ def __init__(self, lock_manager, lock_name, remove_lock=False):
self.lock_manager = lock_manager
self.lock_name = lock_name
self.lock = None
self.acquired = False
+ self.remove_lock = remove_lock
def __enter__(self):
self.lock = self.lock_manager.coordinator.get_lock(self.lock_name)
self.acquired = self.lock.acquire(blocking=False)
if not self.acquired:
LOG.debug(f"Failed to lock for {self.lock_name}")
+ LOG.debug(f"acquired lock for {self.lock_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.acquired:
self.lock.release()
+ LOG.debug(f"released lock for {self.lock_name}")
+ if self.remove_lock:
+ self.lock_manager.coordinator.remove_lock(self.lock_name)
+ LOG.debug(f"removed lock file (if any) for {self.lock_name}")
+
+
+class Coordinator(object):
+ """Tooz coordination wrapper.
+
+ Coordination member id is created from concatenated
+ `prefix` and `agent_id` parameters.
+
+ :param str agent_id: Agent identifier
+ :param str prefix: Used to provide member identifier with a
+ meaningful prefix.
+ """
+
+ def __init__(self, agent_id: Optional[str] = None, prefix: str = ''):
+ self.coordinator = None
+ self.agent_id = agent_id or str(uuid.uuid4())
+ self.started = False
+ self.prefix = prefix
+ self._file_path = None
+
+ def _get_file_path(self, backend_url):
+ if backend_url.startswith('file://'):
+ path = backend_url[7:]
+ # Copied from TooZ's _normalize_path to get the same path they use
+ if sys.platform == 'win32':
+ path = re.sub(r'\\(?=\w:\\)', '', os.path.normpath(path))
+ return os.path.abspath(os.path.join(path, self.prefix))
+ return None
+
+ def start(self) -> None:
+ if self.started:
+ return
+
+ backend_url = CONF.coordination.backend_url
+
+ # member_id should be bytes
+ member_id = (self.prefix + self.agent_id).encode('ascii')
+ self.coordinator = coordination.get_coordinator(backend_url, member_id)
+ assert self.coordinator is not None
+ self.coordinator.start(start_heart=True)
+ self._file_path = self._get_file_path(backend_url)
+ self.started = True
+
+ def stop(self) -> None:
+ """Disconnect from coordination backend and stop heartbeat."""
+ if self.started:
+ if self.coordinator is not None:
+ self.coordinator.stop()
+ self.coordinator = None
+ self.started = False
+
+ def get_lock(self, name: str):
+ """Return a Tooz backend lock.
+
+ :param str name: The lock name that is used to identify it
+ across all nodes.
+ """
+ # lock name should be bytes
+ lock_name = (self.prefix + name).encode('ascii')
+ if self.coordinator is not None:
+ return self.coordinator.get_lock(lock_name)
+ else:
+ raise exception.LockCreationFailed('Coordinator uninitialized.')
+
+ def remove_lock(self, glob_name):
+ # Most locks clean up on release, but not the file lock, so we manually
+ # clean them.
+
+ def _err(file_name: str, exc: Exception) -> None:
+ LOG.warning('Failed to cleanup lock %(name)s: %(exc)s',
+ {'name': file_name, 'exc': exc})
+
+ if self._file_path:
+ files = glob.glob(self._file_path + glob_name)
+ for file_name in files:
+ try:
+ os.remove(file_name)
+ except OSError as exc:
+ if (exc.errno != errno.ENOENT):
+ _err(file_name, exc)
+ except Exception as exc:
+ _err(file_name, exc)
+
+
+COORDINATOR = Coordinator(prefix='staffeln-')
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 6a3ae7c..6e04e2a 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -60,7 +60,7 @@
LOG.debug(
f"try to get lock and run task for volume: {queue.volume_id}."
)
- with lock.Lock(self.lock_mgt, queue.volume_id) as q_lock:
+ with lock.Lock(self.lock_mgt, queue.volume_id, remove_lock=True) as q_lock:
if q_lock.acquired:
self.controller.check_volume_backup_status(queue)
else: # time out
@@ -110,7 +110,7 @@
)
if len(tasks_to_start) != 0:
for task in tasks_to_start:
- with lock.Lock(self.lock_mgt, task.volume_id) as t_lock:
+ with lock.Lock(self.lock_mgt, task.volume_id, remove_lock=True) as t_lock:
if t_lock.acquired:
# Re-pulling status and make it's up-to-date
task = self.controller.get_queue_task_by_id(task_id=task.id)
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 8a2d409..33160cf 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -106,6 +106,19 @@
),
]
+
+coordination_group = cfg.OptGroup(
+ "coordination",
+ title="Coordination Options",
+ help=_("Options under this group are used to define Coordination's configuration."),
+)
+
+
+coordination_opts = [
+ cfg.StrOpt("backend_url", default="", help=_("lock coordination connection backend URL.")),
+]
+
+
CONDUCTOR_OPTS = (backup_opts, rotation_opts)
@@ -113,7 +126,12 @@
conf.register_group(conductor_group)
conf.register_opts(backup_opts, group=conductor_group)
conf.register_opts(rotation_opts, group=conductor_group)
+ conf.register_opts(coordination_opts, group=coordination_group)
def list_opts():
- return {"DEFAULT": rotation_opts, conductor_group: backup_opts}
+ return {
+ "DEFAULT": rotation_opts,
+ conductor_group: backup_opts,
+ coordination_group: coordination_opts
+ }
diff --git a/staffeln/conf/database.py b/staffeln/conf/database.py
index e06bf75..761aa15 100644
--- a/staffeln/conf/database.py
+++ b/staffeln/conf/database.py
@@ -15,7 +15,6 @@
SQL_OPTS = [
cfg.StrOpt("mysql_engine", default="InnoDB", help=_("MySQL engine to use.")),
- cfg.StrOpt("tooz_connection", default="", help=_("Tooz MySQL connection URL.")),
]
diff --git a/staffeln/exception.py b/staffeln/exception.py
new file mode 100644
index 0000000..2faf951
--- /dev/null
+++ b/staffeln/exception.py
@@ -0,0 +1,83 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Staffeln base exception handling."""
+
+from typing import Optional, Union # noqa: H301
+from oslo_log import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class StaffelnException(Exception):
+ """Base Staffeln Exception
+
+ To correctly use this class, inherit from it and define
+ a 'message' property. That message will get printf'd
+ with the keyword arguments provided to the constructor.
+
+ """
+ message = "An unknown exception occurred."
+ code = 500
+ headers: dict = {}
+ safe = False
+
+ def __init__(self, message: Optional[Union[str, tuple]] = None, **kwargs):
+ self.kwargs = kwargs
+ self.kwargs['message'] = message
+
+ if 'code' not in self.kwargs:
+ try:
+ self.kwargs['code'] = self.code
+ except AttributeError:
+ pass
+
+ for k, v in self.kwargs.items():
+ if isinstance(v, Exception):
+ self.kwargs[k] = str(v)
+
+ if self._should_format():
+ try:
+ message = self.message % kwargs
+ except Exception:
+ self._log_exception()
+ message = self.message
+ elif isinstance(message, Exception):
+ message = str(message)
+
+ self.msg = message
+ super(StaffelnException, self).__init__(message)
+ # Oslo.messaging use the argument 'message' to rebuild exception
+ # directly at the rpc client side, therefore we should not use it
+ # in our keyword arguments, otherwise, the rebuild process will fail
+ # with duplicate keyword exception.
+ self.kwargs.pop('message', None)
+
+ def _log_exception(self) -> None:
+ # kwargs doesn't match a variable in the message
+ # log the issue and the kwargs
+ LOG.exception('Exception in string format operation:')
+ for name, value in self.kwargs.items():
+ LOG.error("%(name)s: %(value)s",
+ {'name': name, 'value': value})
+
+ def _should_format(self) -> bool:
+ return self.kwargs['message'] is None or '%(message)' in self.message
+
+
+class LockCreationFailed(StaffelnException):
+ message = "Unable to create lock. Coordination backend not started."