Centralize backup reports
At previous design, backup report was build and send by each node.
This change to only allow puller to send the report.
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 6b60ef5..4eea22e 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,3 +1,4 @@
+BACKUP_FAILED = 3
BACKUP_COMPLETED = 2
BACKUP_WIP = 1
BACKUP_PLANNED = 0
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index fcd39bf..309822e 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -122,12 +122,28 @@
% (volume_id, volume["status"])
)
LOG.info(reason)
- self.result.add_failed_backup(project_id, volume_id, reason)
+ return reason
return res
except OpenstackResourceNotFound:
return False
+ def purge_backups(self):
+ #TODO make all this in a single DB command
+ success_tasks = self.get_queues(
+ filters={"backup_status": constants.BACKUP_COMPLETED}
+ )
+ failed_tasks = self.get_queues(
+ filters={"backup_status": constants.BACKUP_FAILED}
+ )
+ for queue in success_tasks:
+ LOG.info("Start purge completed tasks.")
+ queue.delete_queue()
+
+ for queue in failed_tasks:
+ LOG.info("Start purge failed tasks.")
+ queue.delete_queue()
+
# delete all backups forcily regardless of the status
def hard_cancel_backup_task(self, task):
try:
@@ -142,15 +158,16 @@
if backup is None:
return task.delete_queue()
self.openstacksdk.delete_backup(task.backup_id, force=True)
- task.delete_queue()
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
except OpenstackSDKException as e:
reason = _("Backup %s deletion failed." "%s" % (task.backup_id, str(e)))
LOG.info(reason)
- # remove from the queue table
- task.delete_queue()
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
# delete only available backups: reserved
def soft_remove_backup_task(self, backup_object):
@@ -261,23 +278,33 @@
empty_project = False
self.result.add_project(project.id, project.name)
for volume in server.attached_volumes:
- if not self.filter_by_volume_status(volume["id"], project.id):
+ filter_result = self.filter_by_volume_status(
+ volume["id"], project.id)
+
+ if not filter_result:
continue
if "name" not in volume:
volume_name = volume["id"]
else:
volume_name = volume["name"][:100]
+ if filter_result is True:
+ backup_status = constants.BACKUP_PLANNED
+ reason = None
+ else:
+ backup_status = constants.BACKUP_FAILED
+ reason = filter_result
queues_map.append(
QueueMapping(
project_id=project.id,
volume_id=volume["id"],
backup_id="NULL",
instance_id=server.id,
- backup_status=constants.BACKUP_PLANNED,
+ backup_status=backup_status,
# Only keep the last 100 chars of instance_name and
# volume_name for forming backup_name
instance_name=server.name[:100],
volume_name=volume_name,
+ reason=reason,
)
)
return queues_map
@@ -343,11 +370,8 @@
% (backup_name, queue.volume_id, str(error))
)
LOG.info(reason)
- self.result.add_failed_backup(project_id, queue.volume_id, reason)
- parsed = parse.parse("Error in creating volume backup {id}", str(error))
- if parsed is not None:
- queue.backup_id = parsed["id"]
- queue.backup_status = constants.BACKUP_WIP
+ queue.reason = reason
+ queue.backup_status = constants.BACKUP_FAILED
queue.save()
# Added extra exception as OpenstackSDKException does not handle the keystone unauthourized issue.
except Exception as error:
@@ -356,11 +380,8 @@
% (backup_name, queue.volume_id, str(error))
)
LOG.error(reason)
- self.result.add_failed_backup(project_id, queue.volume_id, reason)
- parsed = parse.parse("Error in creating volume backup {id}", str(error))
- if parsed is not None:
- queue.backup_id = parsed["id"]
- queue.backup_status = constants.BACKUP_WIP
+ queue.reason = reason
+ queue.backup_status = constants.BACKUP_FAILED
queue.save()
else:
# Backup planned task cannot have backup_id in the same cycle.
@@ -373,15 +394,14 @@
reason = _(
"The backup creation for the volume %s was prefailed." % task.volume_id
)
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
LOG.warn(reason)
- # 2. remove failed task from the task queue
- task.delete_queue()
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
def process_failed_backup(self, task):
# 1. notify via email
reason = _("The status of backup for the volume %s is error." % task.volume_id)
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
LOG.warn(reason)
# 2. delete backup generator
try:
@@ -393,8 +413,9 @@
% (task.backup_id, str(ex))
)
)
- # 3. remove failed task from the task queue
- task.delete_queue()
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
def process_non_existing_backup(self, task):
task.delete_queue()
@@ -411,10 +432,8 @@
backup_completed=1,
)
)
- self.result.add_success_backup(task.project_id, task.volume_id, task.backup_id)
- # 2. remove from the task list
- task.delete_queue()
- # 3. TODO(Alex): notify via email
+ task.backup_status = constants.BACKUP_COMPLETED
+ task.save()
def process_using_backup(self, task):
# treat same as the available backup for now
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index f2e4f16..164a38c 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -8,8 +8,8 @@
from tooz import coordination
from staffeln.common import constants, context
-from staffeln.common import node_manage
from staffeln.common import lock
+from staffeln.common import node_manage
from staffeln.common import time as xtime
from staffeln.conductor import backup
from staffeln.i18n import _
@@ -129,6 +129,7 @@
def _report_backup_result(self):
self.controller.publish_backup_result()
+ self.controller.purge_backups()
def backup_engine(self, backup_service_period):
LOG.info("backing... %s" % str(time.time()))
@@ -137,12 +138,14 @@
@periodics.periodic(spacing=backup_service_period, run_immediately=True)
def backup_tasks():
with self.lock_mgt:
- if self.puller.fetch_puller_role():
+ is_puller = self.puller.fetch_puller_role()
+ if is_puller:
self._update_task_queue()
self.puller.renew_update_time()
self._process_todo_tasks()
self._process_wip_tasks()
- self._report_backup_result()
+ if is_puller:
+ self._report_backup_result()
periodic_callables = [
(backup_tasks, (), {}),
diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index 1414347..e24fed7 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -1,7 +1,9 @@
# Email notification package
# This should be upgraded by integrating with mail server to send batch
-import staffeln.conf
from oslo_log import log
+
+import staffeln.conf
+from staffeln.common import constants
from staffeln.common import email
from staffeln.common import time as xtime
from staffeln.conductor import backup
@@ -17,38 +19,10 @@
def initialize(self):
self.content = ""
- self.project_list = []
- self.success_backup_list = {}
- self.failed_backup_list = {}
+ self.project_list = set()
- def add_project(self, id, name):
- if id in self.success_backup_list:
- return
- self.project_list.append({"name": name, "id": id})
- self.success_backup_list[id] = []
- self.failed_backup_list[id] = []
-
- def add_success_backup(self, project_id, volume_id, backup_id):
- if project_id not in self.success_backup_list:
- LOG.error(_("Not registered project is reported for backup result."))
- return
- self.success_backup_list[project_id].append(
- {
- "volume_id": volume_id,
- "backup_id": backup_id,
- }
- )
-
- def add_failed_backup(self, project_id, volume_id, reason):
- if project_id not in self.failed_backup_list:
- LOG.error(_("Not registered project is reported for backup result."))
- return
- self.failed_backup_list[project_id].append(
- {
- "volume_id": volume_id,
- "reason": reason,
- }
- )
+ def add_project(self, project_id, project_name):
+ self.project_list.add((project_id, project_name))
def send_result_email(self):
subject = "Backup result"
@@ -77,9 +51,29 @@
# 1. get quota
self.content = "<h3>${TIME}</h3><br>"
self.content = self.content.replace("${TIME}", xtime.get_current_strtime())
+ backup_mgt = backup.Backup()
+ project_success = {}
+ project_failed = {}
+ success_queues = backup_mgt.get_queues(
+ filters={"backup_status": constants.BACKUP_COMPLETED}
+ )
+ for queue in success_queues:
+ if queue.project_id in project_success:
+ project_success[queue.project_id].append(queue)
+ else:
+ project_success[queue.project_id] = [queue]
+ failed_queues = backup_mgt.get_queues(
+ filters={"backup_status": constants.BACKUP_FAILED}
+ )
+ for queue in failed_queues:
+ if queue.project_id in project_failed:
+ project_failed[queue.project_id].append(queue)
+ else:
+ project_failed[queue.project_id] = [queue]
+
html = ""
- for project in self.project_list:
- quota = backup.Backup().get_backup_quota(project["id"])
+ for project_id, project_name in self.project_list:
+ quota = backup_mgt.get_backup_quota(project_id)
html += (
"<h3>Project: ${PROJECT}</h3><br>"
@@ -91,27 +85,33 @@
"<h4>${FAILED_VOLUME_LIST}</h4><br>"
)
- success_volumes = "<br>".join(
- [
- "Volume ID: %s, Backup ID: %s"
- % (str(e["volume_id"]), str(e["backup_id"]))
- for e in self.success_backup_list[project["id"]]
- ]
- )
- failed_volumes = "<br>".join(
- [
- "Volume ID: %s, Reason: %s"
- % (str(e["volume_id"]), str(e["reason"]))
- for e in self.failed_backup_list[project["id"]]
- ]
- )
+ for project_id in project_success:
+ success_volumes = "<br>".join(
+ [
+ "Volume ID: %s, Backup ID: %s"
+ % (str(e.volume_id), str(e.backup_id))
+ for e in project_success[project_id]
+ ]
+ )
+ else:
+ success_volumes = "<br>"
+ for project_id in project_failed:
+ failed_volumes = "<br>".join(
+ [
+ "Volume ID: %s, Reason: %s"
+ % (str(e.volume_id), str(e.reason))
+ for e in project_failed[project_id]
+ ]
+ )
+ else:
+ failed_volumes = "<br>"
html = html.replace("${QUOTA_LIMIT}", str(quota["limit"]))
html = html.replace("${QUOTA_IN_USE}", str(quota["in_use"]))
html = html.replace("${QUOTA_RESERVED}", str(quota["reserved"]))
html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)
html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)
- html = html.replace("${PROJECT}", project["name"])
+ html = html.replace("${PROJECT}", project_name)
if html == "":
return
self.content += html
diff --git a/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
new file mode 100644
index 0000000..4dc6b03
--- /dev/null
+++ b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
@@ -0,0 +1,21 @@
+"""Add reason column to queue_data table
+
+Revision ID: 2b2b9df199bd
+Revises: 003102f08f66
+Create Date: 2022-11-02 06:14:09.348932
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '2b2b9df199bd'
+down_revision = '003102f08f66'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column(
+ "queue_data",
+ sa.Column("reason", sa.String(length=255), nullable=True)
+ )
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 39ac7ee..6ef7211 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -68,6 +68,7 @@
instance_id = Column(String(100))
volume_name = Column(String(100))
instance_name = Column(String(100))
+ reason = Column(String(255), nullable=True)
class Puller(Base):
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index 561637c..2da75bf 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -20,6 +20,7 @@
"backup_status": sfeild.IntegerField(),
"volume_name": sfeild.StringField(),
"instance_name": sfeild.StringField(),
+ "reason": sfeild.StringField(),
}
@base.remotable_classmethod