blob: 4d60ab1b7603083eeea052453e28c48c6d749f4d [file] [log] [blame]
import errno
import glob
import os
import re
import sys
import uuid
from typing import Optional # noqa: H301
import sherlock
from oslo_log import log
from staffeln import conf, exception
from tooz import coordination
CONF = conf.CONF
LOG = log.getLogger(__name__)
class LockManager(object):
def __init__(self):
backend_url = CONF.coordination.backend_url
# This is for now using to check if any backend_url setup
# for tooz backends as K8s should not need one.any
self.coordinator = COORDINATOR if backend_url else K8SCOORDINATOR
def __enter__(self):
self.coordinator.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.coordinator.stop()
class Lock(object):
def __init__(self, lock_manager, lock_name, remove_lock=False):
self.lock_manager = lock_manager
self.lock_name = lock_name
self.lock = None
self.acquired = False
self.remove_lock = remove_lock
def __enter__(self):
self.lock = self.lock_manager.coordinator.get_lock(self.lock_name)
self.acquired = self.lock.acquire(blocking=False)
if not self.acquired:
LOG.debug(f"Failed to lock for {self.lock_name}")
else:
LOG.debug(f"acquired lock for {self.lock_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.acquired:
self.lock.release()
LOG.debug(f"released lock for {self.lock_name}")
if self.remove_lock:
self.lock_manager.coordinator.remove_lock(self.lock_name)
LOG.debug(f"removed lock file (if any) for {self.lock_name}")
class Coordinator(object):
"""Tooz coordination wrapper.
Coordination member id is created from concatenated
`prefix` and `agent_id` parameters.
:param str agent_id: Agent identifier
:param str prefix: Used to provide member identifier with a
meaningful prefix.
"""
def __init__(self, agent_id: Optional[str] = None, prefix: str = ""):
self.coordinator = None
self.agent_id = agent_id or str(uuid.uuid4())
self.started = False
self.prefix = prefix
self._file_path = None
def _get_file_path(self, backend_url):
if backend_url.startswith("file://"):
path = backend_url[7:]
# Copied from TooZ's _normalize_path to get the same path they use
if sys.platform == "win32":
path = re.sub(r"\\(?=\w:\\)", "", os.path.normpath(path))
return os.path.abspath(os.path.join(path, self.prefix))
return None
def start(self) -> None:
if self.started:
return
backend_url = CONF.coordination.backend_url
# member_id should be bytes
member_id = (self.prefix + self.agent_id).encode("ascii")
self.coordinator = coordination.get_coordinator(backend_url, member_id)
assert self.coordinator is not None
self.coordinator.start(start_heart=True)
self._file_path = self._get_file_path(backend_url)
self.started = True
def stop(self) -> None:
"""Disconnect from coordination backend and stop heartbeat."""
if self.started:
if self.coordinator is not None:
self.coordinator.stop()
self.coordinator = None
self.started = False
def get_lock(self, name: str):
"""Return a Tooz backend lock.
:param str name: The lock name that is used to identify it
across all nodes.
"""
# lock name should be bytes
lock_name = (self.prefix + name).encode("ascii")
if self.coordinator is not None:
return self.coordinator.get_lock(lock_name)
else:
raise exception.LockCreationFailed("Coordinator uninitialized.")
def remove_lock(self, glob_name):
# Most locks clean up on release, but not the file lock, so we manually
# clean them.
def _err(file_name: str, exc: Exception) -> None:
LOG.warning(f"Failed to cleanup lock {file_name}: {exc}")
if self._file_path:
files = glob.glob(self._file_path + glob_name)
for file_name in files:
try:
os.remove(file_name)
except OSError as exc:
if exc.errno != errno.ENOENT:
_err(file_name, exc)
except Exception as exc:
_err(file_name, exc)
class K8sCoordinator(object):
"""Sherlock kubernetes coordination wrapper.
:param int expire: Set lock expire seconds
:param int timeout: Set lock acquire action timeout seconds
:param str namespace: Set lock namespace.
"""
def __init__(
self, expire: int = 3600, timeout: int = 10, namespace: str = "staffeln"
):
self.timeout = timeout
self.expire = expire
self.namespace = namespace
self.started = False
def start(self) -> None:
if self.started:
return
sherlock.configure(expire=self.expire, timeout=self.timeout)
self.started = True
def stop(self) -> None:
"""Disconnect from coordination backend and stop heartbeat."""
pass
def get_lock(self, name: str):
"""Return a kubernetes lease lock.
:param str name: The lock name that is used to identify it
across all nodes.
"""
return sherlock.KubernetesLock(name, self.namespace)
def remove_lock(self, glob_name):
pass
COORDINATOR = Coordinator(prefix="staffeln-")
K8SCOORDINATOR = K8sCoordinator()