Use decorator for lock
This help to reduce redundant lock handling code.
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index cc9d344..20f462e 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -10,3 +10,4 @@
DEFAULT_BACKUP_CYCLE_TIMEOUT = "5min"
PULLER = "puller"
+RETENTION = "retention"
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
index dcecfef..159f60a 100644
--- a/staffeln/common/lock.py
+++ b/staffeln/common/lock.py
@@ -1,8 +1,10 @@
import staffeln.conf
+from oslo_log import log
from oslo_utils import uuidutils
from tooz import coordination
CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
class LockManager(object):
@@ -18,3 +20,22 @@
def __exit__(self, exc_type, exc_val, exc_tb):
self.coordinator.stop()
+
+
+class Lock(object):
+ def __init__(self, lock_manager, lock_name):
+ self.lock_manager = lock_manager
+ self.lock_name = lock_name
+ self.lock = None
+ self.acquired = False
+
+ def __enter__(self):
+ self.lock = self.lock_manager.coordinator.get_lock(self.lock_name)
+ self.acquired = self.lock.acquire(blocking=False)
+ if not self.acquired:
+ LOG.debug(f"Failed to lock for {self.lock_name}")
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self.acquired:
+ self.lock.release()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index a8419cc..ce6af50 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -12,7 +12,6 @@
from staffeln.common import time as xtime
from staffeln.conductor import backup as backup_controller
from staffeln.i18n import _
-from tooz import coordination
LOG = log.getLogger(__name__)
CONF = staffeln.conf.CONF
@@ -58,13 +57,12 @@
if not self._backup_cycle_timeout(): # time in
LOG.info(_("cycle timein"))
for queue in queues_started:
- try:
- with self.lock_mgt.coordinator.get_lock(queue.volume_id):
+ LOG.debug(
+ f"try to get lock and run task for volume: {queue.volume_id}."
+ )
+ with lock.Lock(self.lock_mgt, queue.volume_id) as q_lock:
+ if q_lock.acquired:
self.controller.check_volume_backup_status(queue)
- except coordination.LockAcquireFailed:
- LOG.debug(
- "Failed to lock task for volume: %s." % queue.volume_id
- )
else: # time out
LOG.info(_("cycle timeout"))
for queue in queues_started:
@@ -112,11 +110,9 @@
)
if len(tasks_to_start) != 0:
for task in tasks_to_start:
- try:
- with self.lock_mgt.coordinator.get_lock(task.volume_id):
+ with lock.Lock(self.lock_mgt, task.volume_id) as t_lock:
+ if t_lock.acquired:
self.controller.create_volume_backup(task)
- except coordination.LockAcquireFailed:
- LOG.debug("Failed to lock task for volume: %s." % task.volume_id)
# Refresh the task queue
def _update_task_queue(self):
@@ -161,17 +157,17 @@
@periodics.periodic(spacing=backup_service_period, run_immediately=True)
def backup_tasks():
with self.lock_mgt:
- try:
- with self.lock_mgt.coordinator.get_lock(constants.PULLER):
+ with lock.Lock(self.lock_mgt, constants.PULLER) as puller:
+ if puller.acquired:
LOG.info("Running as puller role")
self._update_task_queue()
self._process_todo_tasks()
self._process_wip_tasks()
self._report_backup_result()
- except coordination.LockAcquireFailed:
- LOG.info("Running as non-puller role")
- self._process_todo_tasks()
- self._process_wip_tasks()
+ else:
+ LOG.info("Running as non-puller role")
+ self._process_todo_tasks()
+ self._process_wip_tasks()
periodic_callables = [
(backup_tasks, (), {}),
@@ -234,10 +230,10 @@
@periodics.periodic(spacing=retention_service_period, run_immediately=True)
def rotation_tasks():
- try:
- # TODO(rlin): change to use decorator for this
- # Make sure only one retention at a time
- with self.lock_mgt.coordinator.get_lock("retention"):
+ with self.lock_mgt:
+ with lock.Lock(self.lock_mgt, constants.RETENTION) as retention:
+ if not retention.acquired:
+ return
self.controller.refresh_openstacksdk()
# get the threshold time
self.threshold_strtime = self.get_time_from_str(
@@ -283,8 +279,6 @@
backup, skip_inc_err=True
)
time.sleep(2)
- except coordination.LockAcquireFailed:
- LOG.debug("Failed to lock for retention")
periodic_callables = [
(rotation_tasks, (), {}),