Centralize backup reports

At previous design, backup report was build and send by each node.
This change to only allow puller to send the report.
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 6b60ef5..4eea22e 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,3 +1,4 @@
+BACKUP_FAILED = 3
 BACKUP_COMPLETED = 2
 BACKUP_WIP = 1
 BACKUP_PLANNED = 0
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index fcd39bf..309822e 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -122,12 +122,28 @@
                     % (volume_id, volume["status"])
                 )
                 LOG.info(reason)
-                self.result.add_failed_backup(project_id, volume_id, reason)
+                return reason
             return res
 
         except OpenstackResourceNotFound:
             return False
 
+    def purge_backups(self):
+        #TODO make all this in a single DB command
+        success_tasks = self.get_queues(
+            filters={"backup_status": constants.BACKUP_COMPLETED}
+        )
+        failed_tasks = self.get_queues(
+            filters={"backup_status": constants.BACKUP_FAILED}
+        )
+        for queue in success_tasks:
+            LOG.info("Start purge completed tasks.")
+            queue.delete_queue()
+
+        for queue in failed_tasks:
+            LOG.info("Start purge failed tasks.")
+            queue.delete_queue()
+
     #  delete all backups forcily regardless of the status
     def hard_cancel_backup_task(self, task):
         try:
@@ -142,15 +158,16 @@
             if backup is None:
                 return task.delete_queue()
             self.openstacksdk.delete_backup(task.backup_id, force=True)
-            task.delete_queue()
-            self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+            task.reason = reason
+            task.backup_status = constants.BACKUP_FAILED
+            task.save()
 
         except OpenstackSDKException as e:
             reason = _("Backup %s deletion failed." "%s" % (task.backup_id, str(e)))
             LOG.info(reason)
-            # remove from the queue table
-            task.delete_queue()
-            self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+            task.reason = reason
+            task.backup_status = constants.BACKUP_FAILED
+            task.save()
 
     #  delete only available backups: reserved
     def soft_remove_backup_task(self, backup_object):
@@ -261,23 +278,33 @@
                     empty_project = False
                     self.result.add_project(project.id, project.name)
                 for volume in server.attached_volumes:
-                    if not self.filter_by_volume_status(volume["id"], project.id):
+                    filter_result = self.filter_by_volume_status(
+                        volume["id"], project.id)
+
+                    if not filter_result:
                         continue
                     if "name" not in volume:
                         volume_name = volume["id"]
                     else:
                         volume_name = volume["name"][:100]
+                    if filter_result is True:
+                        backup_status = constants.BACKUP_PLANNED
+                        reason = None
+                    else:
+                        backup_status = constants.BACKUP_FAILED
+                        reason = filter_result
                     queues_map.append(
                         QueueMapping(
                             project_id=project.id,
                             volume_id=volume["id"],
                             backup_id="NULL",
                             instance_id=server.id,
-                            backup_status=constants.BACKUP_PLANNED,
+                            backup_status=backup_status,
                             # Only keep the last 100 chars of instance_name and
                             # volume_name for forming backup_name
                             instance_name=server.name[:100],
                             volume_name=volume_name,
+                            reason=reason,
                         )
                     )
         return queues_map
@@ -343,11 +370,8 @@
                     % (backup_name, queue.volume_id, str(error))
                 )
                 LOG.info(reason)
-                self.result.add_failed_backup(project_id, queue.volume_id, reason)
-                parsed = parse.parse("Error in creating volume backup {id}", str(error))
-                if parsed is not None:
-                    queue.backup_id = parsed["id"]
-                queue.backup_status = constants.BACKUP_WIP
+                queue.reason = reason
+                queue.backup_status = constants.BACKUP_FAILED
                 queue.save()
             # Added extra exception as OpenstackSDKException does not handle the keystone unauthourized issue.
             except Exception as error:
@@ -356,11 +380,8 @@
                     % (backup_name, queue.volume_id, str(error))
                 )
                 LOG.error(reason)
-                self.result.add_failed_backup(project_id, queue.volume_id, reason)
-                parsed = parse.parse("Error in creating volume backup {id}", str(error))
-                if parsed is not None:
-                    queue.backup_id = parsed["id"]
-                queue.backup_status = constants.BACKUP_WIP
+                queue.reason = reason
+                queue.backup_status = constants.BACKUP_FAILED
                 queue.save()
         else:
             # Backup planned task cannot have backup_id in the same cycle.
@@ -373,15 +394,14 @@
         reason = _(
             "The backup creation for the volume %s was prefailed." % task.volume_id
         )
-        self.result.add_failed_backup(task.project_id, task.volume_id, reason)
         LOG.warn(reason)
-        # 2. remove failed task from the task queue
-        task.delete_queue()
+        task.reason = reason
+        task.backup_status = constants.BACKUP_FAILED
+        task.save()
 
     def process_failed_backup(self, task):
         # 1. notify via email
         reason = _("The status of backup for the volume %s is error." % task.volume_id)
-        self.result.add_failed_backup(task.project_id, task.volume_id, reason)
         LOG.warn(reason)
         # 2. delete backup generator
         try:
@@ -393,8 +413,9 @@
                     % (task.backup_id, str(ex))
                 )
             )
-        # 3. remove failed task from the task queue
-        task.delete_queue()
+        task.reason = reason
+        task.backup_status = constants.BACKUP_FAILED
+        task.save()
 
     def process_non_existing_backup(self, task):
         task.delete_queue()
@@ -411,10 +432,8 @@
                 backup_completed=1,
             )
         )
-        self.result.add_success_backup(task.project_id, task.volume_id, task.backup_id)
-        # 2. remove from the task list
-        task.delete_queue()
-        # 3. TODO(Alex): notify via email
+        task.backup_status = constants.BACKUP_COMPLETED
+        task.save()
 
     def process_using_backup(self, task):
         # treat same as the available backup for now
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index f2e4f16..164a38c 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -8,8 +8,8 @@
 from tooz import coordination

 

 from staffeln.common import constants, context

-from staffeln.common import node_manage

 from staffeln.common import lock

+from staffeln.common import node_manage

 from staffeln.common import time as xtime

 from staffeln.conductor import backup

 from staffeln.i18n import _

@@ -129,6 +129,7 @@
 

     def _report_backup_result(self):

         self.controller.publish_backup_result()

+        self.controller.purge_backups()

 

     def backup_engine(self, backup_service_period):

         LOG.info("backing... %s" % str(time.time()))

@@ -137,12 +138,14 @@
         @periodics.periodic(spacing=backup_service_period, run_immediately=True)

         def backup_tasks():

             with self.lock_mgt:

-                if self.puller.fetch_puller_role():

+                is_puller = self.puller.fetch_puller_role()

+                if is_puller:

                     self._update_task_queue()

                     self.puller.renew_update_time()

                 self._process_todo_tasks()

                 self._process_wip_tasks()

-                self._report_backup_result()

+                if is_puller:

+                    self._report_backup_result()

 

         periodic_callables = [

             (backup_tasks, (), {}),

diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index 1414347..e24fed7 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -1,7 +1,9 @@
 # Email notification package

 # This should be upgraded by integrating with mail server to send batch

-import staffeln.conf

 from oslo_log import log

+

+import staffeln.conf

+from staffeln.common import constants

 from staffeln.common import email

 from staffeln.common import time as xtime

 from staffeln.conductor import backup

@@ -17,38 +19,10 @@
 

     def initialize(self):

         self.content = ""

-        self.project_list = []

-        self.success_backup_list = {}

-        self.failed_backup_list = {}

+        self.project_list = set()

 

-    def add_project(self, id, name):

-        if id in self.success_backup_list:

-            return

-        self.project_list.append({"name": name, "id": id})

-        self.success_backup_list[id] = []

-        self.failed_backup_list[id] = []

-

-    def add_success_backup(self, project_id, volume_id, backup_id):

-        if project_id not in self.success_backup_list:

-            LOG.error(_("Not registered project is reported for backup result."))

-            return

-        self.success_backup_list[project_id].append(

-            {

-                "volume_id": volume_id,

-                "backup_id": backup_id,

-            }

-        )

-

-    def add_failed_backup(self, project_id, volume_id, reason):

-        if project_id not in self.failed_backup_list:

-            LOG.error(_("Not registered project is reported for backup result."))

-            return

-        self.failed_backup_list[project_id].append(

-            {

-                "volume_id": volume_id,

-                "reason": reason,

-            }

-        )

+    def add_project(self, project_id, project_name):

+        self.project_list.add((project_id, project_name))

 

     def send_result_email(self):

         subject = "Backup result"

@@ -77,9 +51,29 @@
         # 1. get quota

         self.content = "<h3>${TIME}</h3><br>"

         self.content = self.content.replace("${TIME}", xtime.get_current_strtime())

+        backup_mgt = backup.Backup()

+        project_success = {}

+        project_failed = {}

+        success_queues = backup_mgt.get_queues(

+            filters={"backup_status": constants.BACKUP_COMPLETED}

+        )

+        for queue in success_queues:

+            if queue.project_id in project_success:

+                project_success[queue.project_id].append(queue)

+            else:

+                project_success[queue.project_id] = [queue]

+        failed_queues = backup_mgt.get_queues(

+            filters={"backup_status": constants.BACKUP_FAILED}

+        )

+        for queue in failed_queues:

+            if queue.project_id in project_failed:

+                project_failed[queue.project_id].append(queue)

+            else:

+                project_failed[queue.project_id] = [queue]

+

         html = ""

-        for project in self.project_list:

-            quota = backup.Backup().get_backup_quota(project["id"])

+        for project_id, project_name in self.project_list:

+            quota = backup_mgt.get_backup_quota(project_id)

 

             html += (

                 "<h3>Project: ${PROJECT}</h3><br>"

@@ -91,27 +85,33 @@
                 "<h4>${FAILED_VOLUME_LIST}</h4><br>"

             )

 

-            success_volumes = "<br>".join(

-                [

-                    "Volume ID: %s, Backup ID: %s"

-                    % (str(e["volume_id"]), str(e["backup_id"]))

-                    for e in self.success_backup_list[project["id"]]

-                ]

-            )

-            failed_volumes = "<br>".join(

-                [

-                    "Volume ID: %s, Reason: %s"

-                    % (str(e["volume_id"]), str(e["reason"]))

-                    for e in self.failed_backup_list[project["id"]]

-                ]

-            )

+            for project_id in project_success:

+                success_volumes = "<br>".join(

+                    [

+                        "Volume ID: %s, Backup ID: %s"

+                        % (str(e.volume_id), str(e.backup_id))

+                        for e in project_success[project_id]

+                    ]

+                )

+            else:

+                success_volumes = "<br>"

+            for project_id in project_failed:

+                failed_volumes = "<br>".join(

+                    [

+                        "Volume ID: %s, Reason: %s"

+                        % (str(e.volume_id), str(e.reason))

+                        for e in project_failed[project_id]

+                    ]

+                )

+            else:

+                failed_volumes = "<br>"

 

             html = html.replace("${QUOTA_LIMIT}", str(quota["limit"]))

             html = html.replace("${QUOTA_IN_USE}", str(quota["in_use"]))

             html = html.replace("${QUOTA_RESERVED}", str(quota["reserved"]))

             html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)

             html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)

-            html = html.replace("${PROJECT}", project["name"])

+            html = html.replace("${PROJECT}", project_name)

         if html == "":

             return

         self.content += html

diff --git a/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
new file mode 100644
index 0000000..4dc6b03
--- /dev/null
+++ b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
@@ -0,0 +1,21 @@
+"""Add reason column to queue_data table
+
+Revision ID: 2b2b9df199bd
+Revises: 003102f08f66
+Create Date: 2022-11-02 06:14:09.348932
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '2b2b9df199bd'
+down_revision = '003102f08f66'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.add_column(
+        "queue_data",
+        sa.Column("reason", sa.String(length=255), nullable=True)
+    )
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 39ac7ee..6ef7211 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -68,6 +68,7 @@
     instance_id = Column(String(100))
     volume_name = Column(String(100))
     instance_name = Column(String(100))
+    reason = Column(String(255), nullable=True)
 
 
 class Puller(Base):
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index 561637c..2da75bf 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -20,6 +20,7 @@
         "backup_status": sfeild.IntegerField(),
         "volume_name": sfeild.StringField(),
         "instance_name": sfeild.StringField(),
+        "reason": sfeild.StringField(),
     }
 
     @base.remotable_classmethod