Revert "Revert "Scalable functionality""
diff --git a/requirements.txt b/requirements.txt
index 1b57644..103f767 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,8 +11,10 @@
 oslo.config>=8.1.0
 oslo.log>=4.4.0 # Apache-2.0
 oslo_versionedobjects
+oslo.utils # Apache-2.0
 openstacksdk>0.28.0
 pymysql
 parse
+tooz # Apache-2.0
 # email
-# smtplib
\ No newline at end of file
+# smtplib
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 6b60ef5..cc9d344 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,3 +1,4 @@
+BACKUP_FAILED = 3
 BACKUP_COMPLETED = 2
 BACKUP_WIP = 1
 BACKUP_PLANNED = 0
@@ -7,3 +8,5 @@
 
 # default config values
 DEFAULT_BACKUP_CYCLE_TIMEOUT = "5min"
+
+PULLER = "puller"
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
new file mode 100644
index 0000000..338ff37
--- /dev/null
+++ b/staffeln/common/lock.py
@@ -0,0 +1,22 @@
+from oslo_utils import uuidutils
+from tooz import coordination
+import staffeln.conf
+
+CONF = staffeln.conf.CONF
+
+
+class LockManager(object):
+    def __init__(self, node_id=None):
+        self.db_url = CONF.database.connection
+        self.node_id = (
+            uuidutils.generate_uuid() if node_id is None else node_id
+        )
+        # get_coordinator(backend_url, member_id)
+        self.coordinator = coordination.get_coordinator(self.db_url, node_id)
+
+    def __enter__(self):
+        self.coordinator.start()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.coordinator.stop()
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index e970074..854794a 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -135,12 +135,28 @@
                     % (volume_id, volume["status"])
                 )
                 LOG.info(reason)
-                self.result.add_failed_backup(project_id, volume_id, reason)
+                return reason
             return res
 
         except OpenstackResourceNotFound:
             return False
 
+    def purge_backups(self):
+        #TODO make all this in a single DB command
+        success_tasks = self.get_queues(
+            filters={"backup_status": constants.BACKUP_COMPLETED}
+        )
+        failed_tasks = self.get_queues(
+            filters={"backup_status": constants.BACKUP_FAILED}
+        )
+        for queue in success_tasks:
+            LOG.info("Start purge completed tasks.")
+            queue.delete_queue()
+
+        for queue in failed_tasks:
+            LOG.info("Start purge failed tasks.")
+            queue.delete_queue()
+
     #  delete all backups forcily regardless of the status
     def hard_cancel_backup_task(self, task):
         try:
@@ -155,15 +171,16 @@
             if backup is None:
                 return task.delete_queue()
             self.openstacksdk.delete_backup(task.backup_id, force=True)
-            task.delete_queue()
-            self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+            task.reason = reason
+            task.backup_status = constants.BACKUP_FAILED
+            task.save()
 
         except OpenstackSDKException as e:
             reason = _("Backup %s deletion failed." "%s" % (task.backup_id, str(e)))
             LOG.info(reason)
-            # remove from the queue table
-            task.delete_queue()
-            self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+            task.reason = reason
+            task.backup_status = constants.BACKUP_FAILED
+            task.save()
 
     #  delete only available backups: reserved
     def soft_remove_backup_task(self, backup_object):
@@ -313,24 +330,34 @@
                     empty_project = False
                     self.result.add_project(project.id, project.name)
                 for volume in server.attached_volumes:
-                    if not self.filter_by_volume_status(volume["id"], project.id):
+                    filter_result = self.filter_by_volume_status(
+                        volume["id"], project.id)
+
+                    if not filter_result:
                         continue
                     if "name" not in volume or not volume["name"]:
                         volume_name = volume["id"]
                     else:
                         volume_name = volume["name"][:100]
+                    if filter_result is True:
+                        backup_status = constants.BACKUP_PLANNED
+                        reason = None
+                    else:
+                        backup_status = constants.BACKUP_FAILED
+                        reason = filter_result
                     queues_map.append(
                         QueueMapping(
                             project_id=project.id,
                             volume_id=volume["id"],
                             backup_id="NULL",
                             instance_id=server.id,
-                            backup_status=constants.BACKUP_PLANNED,
+                            backup_status=backup_status,
                             # Only keep the last 100 chars of instance_name and
                             # volume_name for forming backup_name
                             instance_name=server.name[:100],
                             volume_name=volume_name,
                             incremental=self._is_incremental(volume["id"]),
+                            reason=reason,
                         )
                     )
         return queues_map
@@ -403,10 +430,9 @@
                     % (backup_name, task.volume_id, str(error))
                 )
                 LOG.info(reason)
-                self.result.add_failed_backup(project_id, task.volume_id, reason)
-                parsed = parse.parse("Error in creating volume backup {id}", str(error))
-                if parsed is not None:
-                    task.backup_id = parsed["id"]
+                task.reason = reason
+                task.backup_status = constants.BACKUP_FAILED
+                task.save()
             # Added extra exception as OpenstackSDKException does not handle the keystone unauthourized issue.
             except Exception as error:
                 reason = _(
@@ -414,13 +440,9 @@
                     % (backup_name, task.volume_id, str(error))
                 )
                 LOG.error(reason)
-                self.result.add_failed_backup(project_id, task.volume_id, reason)
-                parsed = parse.parse("Error in creating volume backup {id}", str(error))
-                if parsed is not None:
-                    task.backup_id = parsed["id"]
-            # TODO(oleks): Exception handling for inc backup failure because of missing full backup
-            task.backup_status = constants.BACKUP_WIP
-            task.save()
+                task.reason = reason
+                task.backup_status = constants.BACKUP_FAILED
+                task.save()
         else:
             # Backup planned task cannot have backup_id in the same cycle.
             # Remove this task from the task list
@@ -432,15 +454,14 @@
         reason = _(
             "The backup creation for the volume %s was prefailed." % task.volume_id
         )
-        self.result.add_failed_backup(task.project_id, task.volume_id, reason)
         LOG.warn(reason)
-        # 2. remove failed task from the task queue
-        task.delete_queue()
+        task.reason = reason
+        task.backup_status = constants.BACKUP_FAILED
+        task.save()
 
     def process_failed_backup(self, task):
         # 1. notify via email
         reason = _("The status of backup for the volume %s is error." % task.volume_id)
-        self.result.add_failed_backup(task.project_id, task.volume_id, reason)
         LOG.warn(reason)
         # 2. delete backup generator
         try:
@@ -452,8 +473,9 @@
                     % (task.backup_id, str(ex))
                 )
             )
-        # 3. remove failed task from the task queue
-        task.delete_queue()
+        task.reason = reason
+        task.backup_status = constants.BACKUP_FAILED
+        task.save()
 
     def process_non_existing_backup(self, task):
         task.delete_queue()
@@ -471,12 +493,8 @@
                 incremental=task.incremental,
             )
         )
-        self.result.add_success_backup(
-            task.project_id, task.volume_id, task.backup_id, task.incremental
-        )
-        # 2. remove from the task list
-        task.delete_queue()
-        # 3. TODO(Alex): notify via email
+        task.backup_status = constants.BACKUP_COMPLETED
+        task.save()
 
     def process_using_backup(self, task):
         # treat same as the available backup for now
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index b288129..07f2595 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -5,7 +5,10 @@
 import staffeln.conf

 from futurist import periodics

 from oslo_log import log

+from tooz import coordination

+

 from staffeln.common import constants, context

+from staffeln.common import lock

 from staffeln.common import time as xtime

 from staffeln.conductor import backup

 from staffeln.i18n import _

@@ -22,6 +25,7 @@
         self._shutdown = threading.Event()

         self.conf = conf

         self.ctx = context.make_context()

+        self.lock_mgt = lock.LockManager()

         self.controller = backup.Backup()

         LOG.info("%s init" % self.name)

 

@@ -53,7 +57,12 @@
             if not self._backup_cycle_timeout():  # time in

                 LOG.info(_("cycle timein"))

                 for queue in queues_started:

-                    self.controller.check_volume_backup_status(queue)

+                    try:

+                        with self.lock_mgt.coordinator.get_lock(queue.volume_id):

+                            self.controller.check_volume_backup_status(queue)

+                    except coordination.LockAcquireFailed:

+                        LOG.debug("Failed to lock task for volume: "

+                                  "%s." % queue.volume_id)

             else:  # time out

                 LOG.info(_("cycle timeout"))

                 for queue in queues_started:

@@ -101,7 +110,12 @@
         )

         if len(tasks_to_start) != 0:

             for task in tasks_to_start:

-                self.controller.create_volume_backup(task)

+                try:

+                    with self.lock_mgt.coordinator.get_lock(task.volume_id):

+                        self.controller.create_volume_backup(task)

+                except coordination.LockAcquireFailed:

+                    LOG.debug("Failed to lock task for volume: "

+                              "%s." % task.volume_id)

 

     # Refresh the task queue

     def _update_task_queue(self):

@@ -113,6 +127,7 @@
 

     def _report_backup_result(self):

         self.controller.publish_backup_result()

+        self.controller.purge_backups()

 

     def backup_engine(self, backup_service_period):

         LOG.info("Backup manager started %s" % str(time.time()))

@@ -120,10 +135,18 @@
 

         @periodics.periodic(spacing=backup_service_period, run_immediately=True)

         def backup_tasks():

-            self._update_task_queue()

-            self._process_todo_tasks()

-            self._process_wip_tasks()

-            self._report_backup_result()

+            with self.lock_mgt:

+                try:

+                    with self.lock_mgt.coordinator.get_lock(constants.PULLER):

+                        LOG.info("Running as puller role")

+                        self._update_task_queue()

+                        self._process_todo_tasks()

+                        self._process_wip_tasks()

+                        self._report_backup_result()

+                except coordination.LockAcquireFailed:

+                    LOG.info("Running as non-puller role")

+                    self._process_todo_tasks()

+                    self._process_wip_tasks()

 

         periodic_callables = [

             (backup_tasks, (), {}),

diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index acdf71f..b74fdb8 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -1,7 +1,9 @@
 # Email notification package

 # This should be upgraded by integrating with mail server to send batch

-import staffeln.conf

 from oslo_log import log

+

+import staffeln.conf

+from staffeln.common import constants

 from staffeln.common import email

 from staffeln.common import time as xtime

 from staffeln.conductor import backup

@@ -17,39 +19,10 @@
 

     def initialize(self):

         self.content = ""

-        self.project_list = []

-        self.success_backup_list = {}

-        self.failed_backup_list = {}

+        self.project_list = set()

 

-    def add_project(self, id, name):

-        if id in self.success_backup_list:

-            return

-        self.project_list.append({"name": name, "id": id})

-        self.success_backup_list[id] = []

-        self.failed_backup_list[id] = []

-

-    def add_success_backup(self, project_id, volume_id, backup_id, incremental):

-        if project_id not in self.success_backup_list:

-            LOG.error(_("Not registered project is reported for backup result."))

-            return

-        self.success_backup_list[project_id].append(

-            {

-                "volume_id": volume_id,

-                "backup_id": backup_id,

-                "backup_mode": "Incremental" if incremental else "Full",

-            }

-        )

-

-    def add_failed_backup(self, project_id, volume_id, reason):

-        if project_id not in self.failed_backup_list:

-            LOG.error(_("Not registered project is reported for backup result."))

-            return

-        self.failed_backup_list[project_id].append(

-            {

-                "volume_id": volume_id,

-                "reason": reason,

-            }

-        )

+    def add_project(self, project_id, project_name):

+        self.project_list.add((project_id, project_name))

 

     def send_result_email(self):

         subject = "Backup result"

@@ -78,9 +51,29 @@
         # 1. get quota

         self.content = "<h3>${TIME}</h3><br>"

         self.content = self.content.replace("${TIME}", xtime.get_current_strtime())

+        backup_mgt = backup.Backup()

+        project_success = {}

+        project_failed = {}

+        success_queues = backup_mgt.get_queues(

+            filters={"backup_status": constants.BACKUP_COMPLETED}

+        )

+        for queue in success_queues:

+            if queue.project_id in project_success:

+                project_success[queue.project_id].append(queue)

+            else:

+                project_success[queue.project_id] = [queue]

+        failed_queues = backup_mgt.get_queues(

+            filters={"backup_status": constants.BACKUP_FAILED}

+        )

+        for queue in failed_queues:

+            if queue.project_id in project_failed:

+                project_failed[queue.project_id].append(queue)

+            else:

+                project_failed[queue.project_id] = [queue]

+

         html = ""

-        for project in self.project_list:

-            quota = backup.Backup().get_backup_quota(project["id"])

+        for project_id, project_name in self.project_list:

+            quota = backup_mgt.get_backup_quota(project_id)

 

             html += (

                 "<h3>Project: ${PROJECT}</h3><br>"

@@ -92,27 +85,34 @@
                 "<h4>${FAILED_VOLUME_LIST}</h4><br>"

             )

 

-            success_volumes = "<br>".join(

-                [

-                    "Volume ID: %s, Backup ID: %s %s"

-                    % (str(e["volume_id"]), str(e["backup_id"]), str(e["backup_mode"]))

-                    for e in self.success_backup_list[project["id"]]

-                ]

-            )

-            failed_volumes = "<br>".join(

-                [

-                    "Volume ID: %s, Reason: %s"

-                    % (str(e["volume_id"]), str(e["reason"]))

-                    for e in self.failed_backup_list[project["id"]]

-                ]

-            )

+            for project_id in project_success:

+                success_volumes = "<br>".join(

+                    [

+                        "Volume ID: %s, Backup ID: %s, backup mode: %s"

+                        % (str(e.volume_id), str(e.backup_id),

+                           "Incremental" if e.incremental else "Full")

+                        for e in project_success[project_id]

+                    ]

+                )

+            else:

+                success_volumes = "<br>"

+            for project_id in project_failed:

+                failed_volumes = "<br>".join(

+                    [

+                        "Volume ID: %s, Reason: %s"

+                        % (str(e.volume_id), str(e.reason))

+                        for e in project_failed[project_id]

+                    ]

+                )

+            else:

+                failed_volumes = "<br>"

 

             html = html.replace("${QUOTA_LIMIT}", str(quota["limit"]))

             html = html.replace("${QUOTA_IN_USE}", str(quota["in_use"]))

             html = html.replace("${QUOTA_RESERVED}", str(quota["reserved"]))

             html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)

             html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)

-            html = html.replace("${PROJECT}", project["name"])

+            html = html.replace("${PROJECT}", project_name)

         if html == "":

             return

         self.content += html

diff --git a/staffeln/db/sqlalchemy/alembic/README b/staffeln/db/sqlalchemy/alembic/README
index 42fcedd..d6405b1 100644
--- a/staffeln/db/sqlalchemy/alembic/README
+++ b/staffeln/db/sqlalchemy/alembic/README
@@ -1,5 +1,5 @@
 Please see https://alembic.readthedocs.org/en/latest/index.html for general documentation
 
-Upgrade can be performed by: 
+Upgrade can be performed by:
 $ staffeln-dbmanage upgrade
 $ staffeln-dbmanage upgrade head
diff --git a/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py b/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
index c6869b2..a16f27c 100644
--- a/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
+++ b/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
@@ -1,14 +1,3 @@
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
 """Add volume_name and instance_name to queue_data
 
 Revision ID: 041d9a0f1159
diff --git a/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
new file mode 100644
index 0000000..99bfdd1
--- /dev/null
+++ b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
@@ -0,0 +1,18 @@
+"""Add reason column to queue_data table
+
+Revision ID: 2b2b9df199bd
+Revises: ebdbed01e9a7
+Create Date: 2022-11-02 06:14:09.348932
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '2b2b9df199bd'
+down_revision = 'ebdbed01e9a7'
+
+import sqlalchemy as sa  # noqa: E402
+from alembic import op  # noqa: E402
+
+
+def upgrade():
+    op.add_column("queue_data", sa.Column("reason", sa.String(length=255), nullable=True))
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index d4f99ed..99b7ea3 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -70,3 +70,4 @@
     volume_name = Column(String(100))
     instance_name = Column(String(100))
     incremental = Column(Boolean, default=False)
+    reason = Column(String(255), nullable=True)
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index 7982deb..39e841b 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -21,6 +21,7 @@
         "volume_name": sfeild.StringField(),
         "instance_name": sfeild.StringField(),
         "incremental": sfeild.BooleanField(),
+        "reason": sfeild.StringField(),
     }
 
     @base.remotable_classmethod