Allow lock backup tasks
This patch allow us to lock backup tasks when processing.
So same task will be only process one time.
diff --git a/requirements.txt b/requirements.txt
index bc61474..dc53a3c 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,5 +15,6 @@
openstacksdk>0.28.0
pymysql
parse
+tooz>=2.7.1 # Apache-2.0
# email
# smtplib
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
new file mode 100644
index 0000000..338ff37
--- /dev/null
+++ b/staffeln/common/lock.py
@@ -0,0 +1,22 @@
+from oslo_utils import uuidutils
+from tooz import coordination
+import staffeln.conf
+
+CONF = staffeln.conf.CONF
+
+
+class LockManager(object):
+ def __init__(self, node_id=None):
+ self.db_url = CONF.database.connection
+ self.node_id = (
+ uuidutils.generate_uuid() if node_id is None else node_id
+ )
+ # get_coordinator(backend_url, member_id)
+ self.coordinator = coordination.get_coordinator(self.db_url, node_id)
+
+ def __enter__(self):
+ self.coordinator.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.coordinator.stop()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 0642c07..f2e4f16 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -5,8 +5,11 @@
import staffeln.conf
from futurist import periodics
from oslo_log import log
+from tooz import coordination
+
from staffeln.common import constants, context
from staffeln.common import node_manage
+from staffeln.common import lock
from staffeln.common import time as xtime
from staffeln.conductor import backup
from staffeln.i18n import _
@@ -23,6 +26,7 @@
self._shutdown = threading.Event()
self.conf = conf
self.ctx = context.make_context()
+ self.lock_mgt = lock.LockManager()
self.controller = backup.Backup()
self.puller = node_manage.Puller(self.ctx)
LOG.info("%s init (node_id: %s)" % (self.name, self.puller.node_id))
@@ -55,7 +59,12 @@
if not self._backup_cycle_timeout(): # time in
LOG.info(_("cycle timein"))
for queue in queues_started:
- self.controller.check_volume_backup_status(queue)
+ try:
+ with self.lock_mgt.coordinator.get_lock(queue.volume_id):
+ self.controller.check_volume_backup_status(queue)
+ except coordination.LockAcquireFailed:
+ LOG.debug("Failed to lock task for volume: "
+ "%s." % queue.volume_id)
else: # time out
LOG.info(_("cycle timeout"))
for queue in queues_started:
@@ -103,7 +112,12 @@
)
if len(queues_to_start) != 0:
for queue in queues_to_start:
- self.controller.create_volume_backup(queue)
+ try:
+ with self.lock_mgt.coordinator.get_lock(queue.volume_id):
+ self.controller.create_volume_backup(queue)
+ except coordination.LockAcquireFailed:
+ LOG.debug("Failed to lock task for volume: "
+ "%s." % queue.volume_id)
# Refresh the task queue
def _update_task_queue(self):
@@ -122,13 +136,13 @@
@periodics.periodic(spacing=backup_service_period, run_immediately=True)
def backup_tasks():
- #TODO should switch to using `with`
- if self.puller.fetch_puller_role():
- self._update_task_queue()
- self.puller.renew_update_time()
- self._process_todo_tasks()
- self._process_wip_tasks()
- self._report_backup_result()
+ with self.lock_mgt:
+ if self.puller.fetch_puller_role():
+ self._update_task_queue()
+ self.puller.renew_update_time()
+ self._process_todo_tasks()
+ self._process_wip_tasks()
+ self._report_backup_result()
periodic_callables = [
(backup_tasks, (), {}),