Revert "Revert "Scalable functionality""
diff --git a/requirements.txt b/requirements.txt
index 1b57644..103f767 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,8 +11,10 @@
oslo.config>=8.1.0
oslo.log>=4.4.0 # Apache-2.0
oslo_versionedobjects
+oslo.utils # Apache-2.0
openstacksdk>0.28.0
pymysql
parse
+tooz # Apache-2.0
# email
-# smtplib
\ No newline at end of file
+# smtplib
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 6b60ef5..cc9d344 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,3 +1,4 @@
+BACKUP_FAILED = 3
BACKUP_COMPLETED = 2
BACKUP_WIP = 1
BACKUP_PLANNED = 0
@@ -7,3 +8,5 @@
# default config values
DEFAULT_BACKUP_CYCLE_TIMEOUT = "5min"
+
+PULLER = "puller"
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
new file mode 100644
index 0000000..338ff37
--- /dev/null
+++ b/staffeln/common/lock.py
@@ -0,0 +1,22 @@
+from oslo_utils import uuidutils
+from tooz import coordination
+import staffeln.conf
+
+CONF = staffeln.conf.CONF
+
+
+class LockManager(object):
+ def __init__(self, node_id=None):
+ self.db_url = CONF.database.connection
+ self.node_id = (
+ uuidutils.generate_uuid() if node_id is None else node_id
+ )
+ # get_coordinator(backend_url, member_id)
+ self.coordinator = coordination.get_coordinator(self.db_url, node_id)
+
+ def __enter__(self):
+ self.coordinator.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.coordinator.stop()
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index e970074..854794a 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -135,12 +135,28 @@
% (volume_id, volume["status"])
)
LOG.info(reason)
- self.result.add_failed_backup(project_id, volume_id, reason)
+ return reason
return res
except OpenstackResourceNotFound:
return False
+ def purge_backups(self):
+ #TODO make all this in a single DB command
+ success_tasks = self.get_queues(
+ filters={"backup_status": constants.BACKUP_COMPLETED}
+ )
+ failed_tasks = self.get_queues(
+ filters={"backup_status": constants.BACKUP_FAILED}
+ )
+ for queue in success_tasks:
+ LOG.info("Start purge completed tasks.")
+ queue.delete_queue()
+
+ for queue in failed_tasks:
+ LOG.info("Start purge failed tasks.")
+ queue.delete_queue()
+
# delete all backups forcily regardless of the status
def hard_cancel_backup_task(self, task):
try:
@@ -155,15 +171,16 @@
if backup is None:
return task.delete_queue()
self.openstacksdk.delete_backup(task.backup_id, force=True)
- task.delete_queue()
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
except OpenstackSDKException as e:
reason = _("Backup %s deletion failed." "%s" % (task.backup_id, str(e)))
LOG.info(reason)
- # remove from the queue table
- task.delete_queue()
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
# delete only available backups: reserved
def soft_remove_backup_task(self, backup_object):
@@ -313,24 +330,34 @@
empty_project = False
self.result.add_project(project.id, project.name)
for volume in server.attached_volumes:
- if not self.filter_by_volume_status(volume["id"], project.id):
+ filter_result = self.filter_by_volume_status(
+ volume["id"], project.id)
+
+ if not filter_result:
continue
if "name" not in volume or not volume["name"]:
volume_name = volume["id"]
else:
volume_name = volume["name"][:100]
+ if filter_result is True:
+ backup_status = constants.BACKUP_PLANNED
+ reason = None
+ else:
+ backup_status = constants.BACKUP_FAILED
+ reason = filter_result
queues_map.append(
QueueMapping(
project_id=project.id,
volume_id=volume["id"],
backup_id="NULL",
instance_id=server.id,
- backup_status=constants.BACKUP_PLANNED,
+ backup_status=backup_status,
# Only keep the last 100 chars of instance_name and
# volume_name for forming backup_name
instance_name=server.name[:100],
volume_name=volume_name,
incremental=self._is_incremental(volume["id"]),
+ reason=reason,
)
)
return queues_map
@@ -403,10 +430,9 @@
% (backup_name, task.volume_id, str(error))
)
LOG.info(reason)
- self.result.add_failed_backup(project_id, task.volume_id, reason)
- parsed = parse.parse("Error in creating volume backup {id}", str(error))
- if parsed is not None:
- task.backup_id = parsed["id"]
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
# Added extra exception as OpenstackSDKException does not handle the keystone unauthourized issue.
except Exception as error:
reason = _(
@@ -414,13 +440,9 @@
% (backup_name, task.volume_id, str(error))
)
LOG.error(reason)
- self.result.add_failed_backup(project_id, task.volume_id, reason)
- parsed = parse.parse("Error in creating volume backup {id}", str(error))
- if parsed is not None:
- task.backup_id = parsed["id"]
- # TODO(oleks): Exception handling for inc backup failure because of missing full backup
- task.backup_status = constants.BACKUP_WIP
- task.save()
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
else:
# Backup planned task cannot have backup_id in the same cycle.
# Remove this task from the task list
@@ -432,15 +454,14 @@
reason = _(
"The backup creation for the volume %s was prefailed." % task.volume_id
)
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
LOG.warn(reason)
- # 2. remove failed task from the task queue
- task.delete_queue()
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
def process_failed_backup(self, task):
# 1. notify via email
reason = _("The status of backup for the volume %s is error." % task.volume_id)
- self.result.add_failed_backup(task.project_id, task.volume_id, reason)
LOG.warn(reason)
# 2. delete backup generator
try:
@@ -452,8 +473,9 @@
% (task.backup_id, str(ex))
)
)
- # 3. remove failed task from the task queue
- task.delete_queue()
+ task.reason = reason
+ task.backup_status = constants.BACKUP_FAILED
+ task.save()
def process_non_existing_backup(self, task):
task.delete_queue()
@@ -471,12 +493,8 @@
incremental=task.incremental,
)
)
- self.result.add_success_backup(
- task.project_id, task.volume_id, task.backup_id, task.incremental
- )
- # 2. remove from the task list
- task.delete_queue()
- # 3. TODO(Alex): notify via email
+ task.backup_status = constants.BACKUP_COMPLETED
+ task.save()
def process_using_backup(self, task):
# treat same as the available backup for now
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index b288129..07f2595 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -5,7 +5,10 @@
import staffeln.conf
from futurist import periodics
from oslo_log import log
+from tooz import coordination
+
from staffeln.common import constants, context
+from staffeln.common import lock
from staffeln.common import time as xtime
from staffeln.conductor import backup
from staffeln.i18n import _
@@ -22,6 +25,7 @@
self._shutdown = threading.Event()
self.conf = conf
self.ctx = context.make_context()
+ self.lock_mgt = lock.LockManager()
self.controller = backup.Backup()
LOG.info("%s init" % self.name)
@@ -53,7 +57,12 @@
if not self._backup_cycle_timeout(): # time in
LOG.info(_("cycle timein"))
for queue in queues_started:
- self.controller.check_volume_backup_status(queue)
+ try:
+ with self.lock_mgt.coordinator.get_lock(queue.volume_id):
+ self.controller.check_volume_backup_status(queue)
+ except coordination.LockAcquireFailed:
+ LOG.debug("Failed to lock task for volume: "
+ "%s." % queue.volume_id)
else: # time out
LOG.info(_("cycle timeout"))
for queue in queues_started:
@@ -101,7 +110,12 @@
)
if len(tasks_to_start) != 0:
for task in tasks_to_start:
- self.controller.create_volume_backup(task)
+ try:
+ with self.lock_mgt.coordinator.get_lock(task.volume_id):
+ self.controller.create_volume_backup(task)
+ except coordination.LockAcquireFailed:
+ LOG.debug("Failed to lock task for volume: "
+ "%s." % task.volume_id)
# Refresh the task queue
def _update_task_queue(self):
@@ -113,6 +127,7 @@
def _report_backup_result(self):
self.controller.publish_backup_result()
+ self.controller.purge_backups()
def backup_engine(self, backup_service_period):
LOG.info("Backup manager started %s" % str(time.time()))
@@ -120,10 +135,18 @@
@periodics.periodic(spacing=backup_service_period, run_immediately=True)
def backup_tasks():
- self._update_task_queue()
- self._process_todo_tasks()
- self._process_wip_tasks()
- self._report_backup_result()
+ with self.lock_mgt:
+ try:
+ with self.lock_mgt.coordinator.get_lock(constants.PULLER):
+ LOG.info("Running as puller role")
+ self._update_task_queue()
+ self._process_todo_tasks()
+ self._process_wip_tasks()
+ self._report_backup_result()
+ except coordination.LockAcquireFailed:
+ LOG.info("Running as non-puller role")
+ self._process_todo_tasks()
+ self._process_wip_tasks()
periodic_callables = [
(backup_tasks, (), {}),
diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index acdf71f..b74fdb8 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -1,7 +1,9 @@
# Email notification package
# This should be upgraded by integrating with mail server to send batch
-import staffeln.conf
from oslo_log import log
+
+import staffeln.conf
+from staffeln.common import constants
from staffeln.common import email
from staffeln.common import time as xtime
from staffeln.conductor import backup
@@ -17,39 +19,10 @@
def initialize(self):
self.content = ""
- self.project_list = []
- self.success_backup_list = {}
- self.failed_backup_list = {}
+ self.project_list = set()
- def add_project(self, id, name):
- if id in self.success_backup_list:
- return
- self.project_list.append({"name": name, "id": id})
- self.success_backup_list[id] = []
- self.failed_backup_list[id] = []
-
- def add_success_backup(self, project_id, volume_id, backup_id, incremental):
- if project_id not in self.success_backup_list:
- LOG.error(_("Not registered project is reported for backup result."))
- return
- self.success_backup_list[project_id].append(
- {
- "volume_id": volume_id,
- "backup_id": backup_id,
- "backup_mode": "Incremental" if incremental else "Full",
- }
- )
-
- def add_failed_backup(self, project_id, volume_id, reason):
- if project_id not in self.failed_backup_list:
- LOG.error(_("Not registered project is reported for backup result."))
- return
- self.failed_backup_list[project_id].append(
- {
- "volume_id": volume_id,
- "reason": reason,
- }
- )
+ def add_project(self, project_id, project_name):
+ self.project_list.add((project_id, project_name))
def send_result_email(self):
subject = "Backup result"
@@ -78,9 +51,29 @@
# 1. get quota
self.content = "<h3>${TIME}</h3><br>"
self.content = self.content.replace("${TIME}", xtime.get_current_strtime())
+ backup_mgt = backup.Backup()
+ project_success = {}
+ project_failed = {}
+ success_queues = backup_mgt.get_queues(
+ filters={"backup_status": constants.BACKUP_COMPLETED}
+ )
+ for queue in success_queues:
+ if queue.project_id in project_success:
+ project_success[queue.project_id].append(queue)
+ else:
+ project_success[queue.project_id] = [queue]
+ failed_queues = backup_mgt.get_queues(
+ filters={"backup_status": constants.BACKUP_FAILED}
+ )
+ for queue in failed_queues:
+ if queue.project_id in project_failed:
+ project_failed[queue.project_id].append(queue)
+ else:
+ project_failed[queue.project_id] = [queue]
+
html = ""
- for project in self.project_list:
- quota = backup.Backup().get_backup_quota(project["id"])
+ for project_id, project_name in self.project_list:
+ quota = backup_mgt.get_backup_quota(project_id)
html += (
"<h3>Project: ${PROJECT}</h3><br>"
@@ -92,27 +85,34 @@
"<h4>${FAILED_VOLUME_LIST}</h4><br>"
)
- success_volumes = "<br>".join(
- [
- "Volume ID: %s, Backup ID: %s %s"
- % (str(e["volume_id"]), str(e["backup_id"]), str(e["backup_mode"]))
- for e in self.success_backup_list[project["id"]]
- ]
- )
- failed_volumes = "<br>".join(
- [
- "Volume ID: %s, Reason: %s"
- % (str(e["volume_id"]), str(e["reason"]))
- for e in self.failed_backup_list[project["id"]]
- ]
- )
+ for project_id in project_success:
+ success_volumes = "<br>".join(
+ [
+ "Volume ID: %s, Backup ID: %s, backup mode: %s"
+ % (str(e.volume_id), str(e.backup_id),
+ "Incremental" if e.incremental else "Full")
+ for e in project_success[project_id]
+ ]
+ )
+ else:
+ success_volumes = "<br>"
+ for project_id in project_failed:
+ failed_volumes = "<br>".join(
+ [
+ "Volume ID: %s, Reason: %s"
+ % (str(e.volume_id), str(e.reason))
+ for e in project_failed[project_id]
+ ]
+ )
+ else:
+ failed_volumes = "<br>"
html = html.replace("${QUOTA_LIMIT}", str(quota["limit"]))
html = html.replace("${QUOTA_IN_USE}", str(quota["in_use"]))
html = html.replace("${QUOTA_RESERVED}", str(quota["reserved"]))
html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)
html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)
- html = html.replace("${PROJECT}", project["name"])
+ html = html.replace("${PROJECT}", project_name)
if html == "":
return
self.content += html
diff --git a/staffeln/db/sqlalchemy/alembic/README b/staffeln/db/sqlalchemy/alembic/README
index 42fcedd..d6405b1 100644
--- a/staffeln/db/sqlalchemy/alembic/README
+++ b/staffeln/db/sqlalchemy/alembic/README
@@ -1,5 +1,5 @@
Please see https://alembic.readthedocs.org/en/latest/index.html for general documentation
-Upgrade can be performed by:
+Upgrade can be performed by:
$ staffeln-dbmanage upgrade
$ staffeln-dbmanage upgrade head
diff --git a/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py b/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
index c6869b2..a16f27c 100644
--- a/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
+++ b/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
@@ -1,14 +1,3 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
"""Add volume_name and instance_name to queue_data
Revision ID: 041d9a0f1159
diff --git a/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
new file mode 100644
index 0000000..99bfdd1
--- /dev/null
+++ b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
@@ -0,0 +1,18 @@
+"""Add reason column to queue_data table
+
+Revision ID: 2b2b9df199bd
+Revises: ebdbed01e9a7
+Create Date: 2022-11-02 06:14:09.348932
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '2b2b9df199bd'
+down_revision = 'ebdbed01e9a7'
+
+import sqlalchemy as sa # noqa: E402
+from alembic import op # noqa: E402
+
+
+def upgrade():
+ op.add_column("queue_data", sa.Column("reason", sa.String(length=255), nullable=True))
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index d4f99ed..99b7ea3 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -70,3 +70,4 @@
volume_name = Column(String(100))
instance_name = Column(String(100))
incremental = Column(Boolean, default=False)
+ reason = Column(String(255), nullable=True)
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index 7982deb..39e841b 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -21,6 +21,7 @@
"volume_name": sfeild.StringField(),
"instance_name": sfeild.StringField(),
"incremental": sfeild.BooleanField(),
+ "reason": sfeild.StringField(),
}
@base.remotable_classmethod