Allow lock backup tasks

This patch allow us to lock backup tasks when processing.
So same task will be only process one time.
diff --git a/requirements.txt b/requirements.txt
index bc61474..dc53a3c 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,5 +15,6 @@
 openstacksdk>0.28.0
 pymysql
 parse
+tooz>=2.7.1 # Apache-2.0
 # email
 # smtplib
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
new file mode 100644
index 0000000..338ff37
--- /dev/null
+++ b/staffeln/common/lock.py
@@ -0,0 +1,22 @@
+from oslo_utils import uuidutils
+from tooz import coordination
+import staffeln.conf
+
+CONF = staffeln.conf.CONF
+
+
+class LockManager(object):
+    def __init__(self, node_id=None):
+        self.db_url = CONF.database.connection
+        self.node_id = (
+            uuidutils.generate_uuid() if node_id is None else node_id
+        )
+        # get_coordinator(backend_url, member_id)
+        self.coordinator = coordination.get_coordinator(self.db_url, node_id)
+
+    def __enter__(self):
+        self.coordinator.start()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.coordinator.stop()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 0642c07..f2e4f16 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -5,8 +5,11 @@
 import staffeln.conf

 from futurist import periodics

 from oslo_log import log

+from tooz import coordination

+

 from staffeln.common import constants, context

 from staffeln.common import node_manage

+from staffeln.common import lock

 from staffeln.common import time as xtime

 from staffeln.conductor import backup

 from staffeln.i18n import _

@@ -23,6 +26,7 @@
         self._shutdown = threading.Event()

         self.conf = conf

         self.ctx = context.make_context()

+        self.lock_mgt = lock.LockManager()

         self.controller = backup.Backup()

         self.puller = node_manage.Puller(self.ctx)

         LOG.info("%s init (node_id: %s)" % (self.name, self.puller.node_id))

@@ -55,7 +59,12 @@
             if not self._backup_cycle_timeout():  # time in

                 LOG.info(_("cycle timein"))

                 for queue in queues_started:

-                    self.controller.check_volume_backup_status(queue)

+                    try:

+                        with self.lock_mgt.coordinator.get_lock(queue.volume_id):

+                            self.controller.check_volume_backup_status(queue)

+                    except coordination.LockAcquireFailed:

+                        LOG.debug("Failed to lock task for volume: "

+                                  "%s." % queue.volume_id)

             else:  # time out

                 LOG.info(_("cycle timeout"))

                 for queue in queues_started:

@@ -103,7 +112,12 @@
         )

         if len(queues_to_start) != 0:

             for queue in queues_to_start:

-                self.controller.create_volume_backup(queue)

+                try:

+                    with self.lock_mgt.coordinator.get_lock(queue.volume_id):

+                        self.controller.create_volume_backup(queue)

+                except coordination.LockAcquireFailed:

+                    LOG.debug("Failed to lock task for volume: "

+                              "%s." % queue.volume_id)

 

     # Refresh the task queue

     def _update_task_queue(self):

@@ -122,13 +136,13 @@
 

         @periodics.periodic(spacing=backup_service_period, run_immediately=True)

         def backup_tasks():

-            #TODO should switch to using `with`

-            if self.puller.fetch_puller_role():

-                self._update_task_queue()

-                self.puller.renew_update_time()

-            self._process_todo_tasks()

-            self._process_wip_tasks()

-            self._report_backup_result()

+            with self.lock_mgt:

+                if self.puller.fetch_puller_role():

+                    self._update_task_queue()

+                    self.puller.renew_update_time()

+                self._process_todo_tasks()

+                self._process_wip_tasks()

+                self._report_backup_result()

 

         periodic_callables = [

             (backup_tasks, (), {}),