Revert "Scalable functionality"
diff --git a/requirements.txt b/requirements.txt
index dc53a3c..1b57644 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,10 +11,8 @@
oslo.config>=8.1.0
oslo.log>=4.4.0 # Apache-2.0
oslo_versionedobjects
-oslo.utils>=3.40.0 # Apache-2.0
openstacksdk>0.28.0
pymysql
parse
-tooz>=2.7.1 # Apache-2.0
# email
-# smtplib
+# smtplib
\ No newline at end of file
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 4eea22e..6b60ef5 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,4 +1,3 @@
-BACKUP_FAILED = 3
BACKUP_COMPLETED = 2
BACKUP_WIP = 1
BACKUP_PLANNED = 0
diff --git a/staffeln/common/lock.py b/staffeln/common/lock.py
deleted file mode 100644
index 338ff37..0000000
--- a/staffeln/common/lock.py
+++ /dev/null
@@ -1,22 +0,0 @@
-from oslo_utils import uuidutils
-from tooz import coordination
-import staffeln.conf
-
-CONF = staffeln.conf.CONF
-
-
-class LockManager(object):
- def __init__(self, node_id=None):
- self.db_url = CONF.database.connection
- self.node_id = (
- uuidutils.generate_uuid() if node_id is None else node_id
- )
- # get_coordinator(backend_url, member_id)
- self.coordinator = coordination.get_coordinator(self.db_url, node_id)
-
- def __enter__(self):
- self.coordinator.start()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.coordinator.stop()
diff --git a/staffeln/common/node_manage.py b/staffeln/common/node_manage.py
deleted file mode 100644
index 16a2575..0000000
--- a/staffeln/common/node_manage.py
+++ /dev/null
@@ -1,59 +0,0 @@
-from datetime import datetime, timezone
-
-from oslo_utils import uuidutils
-
-import staffeln.conf
-from staffeln.objects import puller
-
-CONF = staffeln.conf.CONF
-
-
-class Puller(object):
- def __init__(self, context, node_id=None):
- self.ctx = context
-
- self.node_id = (
- uuidutils.generate_uuid() if node_id is None else node_id
- )
- self.puller = None
-
- def fetch_puller_role(self):
- target_puller = puller.Puller.get(context=self.ctx)
-
- # No puller, run for puller role
- if not target_puller:
- self.puller = puller.Puller(self.ctx)
- self.puller.node_id = self.node_id
- self.puller.updated_at = datetime.now(timezone.utc)
- self.puller.create()
- return True
- # If puller expired, run for new puller role
- elif self.is_old_puller(target_puller):
- self.puller = puller.Puller(self.ctx)
- self.puller.node_id = self.node_id
- self.puller.updated_at = datetime.now(timezone.utc)
- self.puller.save()
- return True
- else:
- return False
-
- self.puller = puller.Puller.get(context=self.ctx)
- # Return True if this new puller's node_id is this node.
- return self.puller.node_id == self.node_id
-
- def is_old_puller(self, target_puller):
- valid_period = CONF.conductor.backup_service_period * 2
- # Check if puller have not been update for more than two
- # backup_service_period.
- return True if (
- datetime.now(timezone.utc) - target_puller.updated_at
- ).total_seconds() > valid_period else False
-
- def renew_update_time(self):
- if self.puller is None:
- return
- self.puller = puller.Puller.get(context=self.ctx)
-
- if self.puller.node_id == self.node_id:
- self.puller.updated_at = datetime.now(timezone.utc)
- self.puller.save()
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 309822e..fcd39bf 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -122,28 +122,12 @@
% (volume_id, volume["status"])
)
LOG.info(reason)
- return reason
+ self.result.add_failed_backup(project_id, volume_id, reason)
return res
except OpenstackResourceNotFound:
return False
- def purge_backups(self):
- #TODO make all this in a single DB command
- success_tasks = self.get_queues(
- filters={"backup_status": constants.BACKUP_COMPLETED}
- )
- failed_tasks = self.get_queues(
- filters={"backup_status": constants.BACKUP_FAILED}
- )
- for queue in success_tasks:
- LOG.info("Start purge completed tasks.")
- queue.delete_queue()
-
- for queue in failed_tasks:
- LOG.info("Start purge failed tasks.")
- queue.delete_queue()
-
# delete all backups forcily regardless of the status
def hard_cancel_backup_task(self, task):
try:
@@ -158,16 +142,15 @@
if backup is None:
return task.delete_queue()
self.openstacksdk.delete_backup(task.backup_id, force=True)
- task.reason = reason
- task.backup_status = constants.BACKUP_FAILED
- task.save()
+ task.delete_queue()
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
except OpenstackSDKException as e:
reason = _("Backup %s deletion failed." "%s" % (task.backup_id, str(e)))
LOG.info(reason)
- task.reason = reason
- task.backup_status = constants.BACKUP_FAILED
- task.save()
+ # remove from the queue table
+ task.delete_queue()
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
# delete only available backups: reserved
def soft_remove_backup_task(self, backup_object):
@@ -278,33 +261,23 @@
empty_project = False
self.result.add_project(project.id, project.name)
for volume in server.attached_volumes:
- filter_result = self.filter_by_volume_status(
- volume["id"], project.id)
-
- if not filter_result:
+ if not self.filter_by_volume_status(volume["id"], project.id):
continue
if "name" not in volume:
volume_name = volume["id"]
else:
volume_name = volume["name"][:100]
- if filter_result is True:
- backup_status = constants.BACKUP_PLANNED
- reason = None
- else:
- backup_status = constants.BACKUP_FAILED
- reason = filter_result
queues_map.append(
QueueMapping(
project_id=project.id,
volume_id=volume["id"],
backup_id="NULL",
instance_id=server.id,
- backup_status=backup_status,
+ backup_status=constants.BACKUP_PLANNED,
# Only keep the last 100 chars of instance_name and
# volume_name for forming backup_name
instance_name=server.name[:100],
volume_name=volume_name,
- reason=reason,
)
)
return queues_map
@@ -370,8 +343,11 @@
% (backup_name, queue.volume_id, str(error))
)
LOG.info(reason)
- queue.reason = reason
- queue.backup_status = constants.BACKUP_FAILED
+ self.result.add_failed_backup(project_id, queue.volume_id, reason)
+ parsed = parse.parse("Error in creating volume backup {id}", str(error))
+ if parsed is not None:
+ queue.backup_id = parsed["id"]
+ queue.backup_status = constants.BACKUP_WIP
queue.save()
# Added extra exception as OpenstackSDKException does not handle the keystone unauthourized issue.
except Exception as error:
@@ -380,8 +356,11 @@
% (backup_name, queue.volume_id, str(error))
)
LOG.error(reason)
- queue.reason = reason
- queue.backup_status = constants.BACKUP_FAILED
+ self.result.add_failed_backup(project_id, queue.volume_id, reason)
+ parsed = parse.parse("Error in creating volume backup {id}", str(error))
+ if parsed is not None:
+ queue.backup_id = parsed["id"]
+ queue.backup_status = constants.BACKUP_WIP
queue.save()
else:
# Backup planned task cannot have backup_id in the same cycle.
@@ -394,14 +373,15 @@
reason = _(
"The backup creation for the volume %s was prefailed." % task.volume_id
)
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
LOG.warn(reason)
- task.reason = reason
- task.backup_status = constants.BACKUP_FAILED
- task.save()
+ # 2. remove failed task from the task queue
+ task.delete_queue()
def process_failed_backup(self, task):
# 1. notify via email
reason = _("The status of backup for the volume %s is error." % task.volume_id)
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
LOG.warn(reason)
# 2. delete backup generator
try:
@@ -413,9 +393,8 @@
% (task.backup_id, str(ex))
)
)
- task.reason = reason
- task.backup_status = constants.BACKUP_FAILED
- task.save()
+ # 3. remove failed task from the task queue
+ task.delete_queue()
def process_non_existing_backup(self, task):
task.delete_queue()
@@ -432,8 +411,10 @@
backup_completed=1,
)
)
- task.backup_status = constants.BACKUP_COMPLETED
- task.save()
+ self.result.add_success_backup(task.project_id, task.volume_id, task.backup_id)
+ # 2. remove from the task list
+ task.delete_queue()
+ # 3. TODO(Alex): notify via email
def process_using_backup(self, task):
# treat same as the available backup for now
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 164a38c..a6e93a9 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -5,11 +5,7 @@
import staffeln.conf
from futurist import periodics
from oslo_log import log
-from tooz import coordination
-
from staffeln.common import constants, context
-from staffeln.common import lock
-from staffeln.common import node_manage
from staffeln.common import time as xtime
from staffeln.conductor import backup
from staffeln.i18n import _
@@ -26,10 +22,8 @@
self._shutdown = threading.Event()
self.conf = conf
self.ctx = context.make_context()
- self.lock_mgt = lock.LockManager()
self.controller = backup.Backup()
- self.puller = node_manage.Puller(self.ctx)
- LOG.info("%s init (node_id: %s)" % (self.name, self.puller.node_id))
+ LOG.info("%s init" % self.name)
def run(self):
LOG.info("%s run" % self.name)
@@ -59,12 +53,7 @@
if not self._backup_cycle_timeout(): # time in
LOG.info(_("cycle timein"))
for queue in queues_started:
- try:
- with self.lock_mgt.coordinator.get_lock(queue.volume_id):
- self.controller.check_volume_backup_status(queue)
- except coordination.LockAcquireFailed:
- LOG.debug("Failed to lock task for volume: "
- "%s." % queue.volume_id)
+ self.controller.check_volume_backup_status(queue)
else: # time out
LOG.info(_("cycle timeout"))
for queue in queues_started:
@@ -112,12 +101,7 @@
)
if len(queues_to_start) != 0:
for queue in queues_to_start:
- try:
- with self.lock_mgt.coordinator.get_lock(queue.volume_id):
- self.controller.create_volume_backup(queue)
- except coordination.LockAcquireFailed:
- LOG.debug("Failed to lock task for volume: "
- "%s." % queue.volume_id)
+ self.controller.create_volume_backup(queue)
# Refresh the task queue
def _update_task_queue(self):
@@ -129,7 +113,6 @@
def _report_backup_result(self):
self.controller.publish_backup_result()
- self.controller.purge_backups()
def backup_engine(self, backup_service_period):
LOG.info("backing... %s" % str(time.time()))
@@ -137,15 +120,10 @@
@periodics.periodic(spacing=backup_service_period, run_immediately=True)
def backup_tasks():
- with self.lock_mgt:
- is_puller = self.puller.fetch_puller_role()
- if is_puller:
- self._update_task_queue()
- self.puller.renew_update_time()
- self._process_todo_tasks()
- self._process_wip_tasks()
- if is_puller:
- self._report_backup_result()
+ self._update_task_queue()
+ self._process_todo_tasks()
+ self._process_wip_tasks()
+ self._report_backup_result()
periodic_callables = [
(backup_tasks, (), {}),
diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index e24fed7..1414347 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -1,9 +1,7 @@
# Email notification package
# This should be upgraded by integrating with mail server to send batch
-from oslo_log import log
-
import staffeln.conf
-from staffeln.common import constants
+from oslo_log import log
from staffeln.common import email
from staffeln.common import time as xtime
from staffeln.conductor import backup
@@ -19,10 +17,38 @@
def initialize(self):
self.content = ""
- self.project_list = set()
+ self.project_list = []
+ self.success_backup_list = {}
+ self.failed_backup_list = {}
- def add_project(self, project_id, project_name):
- self.project_list.add((project_id, project_name))
+ def add_project(self, id, name):
+ if id in self.success_backup_list:
+ return
+ self.project_list.append({"name": name, "id": id})
+ self.success_backup_list[id] = []
+ self.failed_backup_list[id] = []
+
+ def add_success_backup(self, project_id, volume_id, backup_id):
+ if project_id not in self.success_backup_list:
+ LOG.error(_("Not registered project is reported for backup result."))
+ return
+ self.success_backup_list[project_id].append(
+ {
+ "volume_id": volume_id,
+ "backup_id": backup_id,
+ }
+ )
+
+ def add_failed_backup(self, project_id, volume_id, reason):
+ if project_id not in self.failed_backup_list:
+ LOG.error(_("Not registered project is reported for backup result."))
+ return
+ self.failed_backup_list[project_id].append(
+ {
+ "volume_id": volume_id,
+ "reason": reason,
+ }
+ )
def send_result_email(self):
subject = "Backup result"
@@ -51,29 +77,9 @@
# 1. get quota
self.content = "<h3>${TIME}</h3><br>"
self.content = self.content.replace("${TIME}", xtime.get_current_strtime())
- backup_mgt = backup.Backup()
- project_success = {}
- project_failed = {}
- success_queues = backup_mgt.get_queues(
- filters={"backup_status": constants.BACKUP_COMPLETED}
- )
- for queue in success_queues:
- if queue.project_id in project_success:
- project_success[queue.project_id].append(queue)
- else:
- project_success[queue.project_id] = [queue]
- failed_queues = backup_mgt.get_queues(
- filters={"backup_status": constants.BACKUP_FAILED}
- )
- for queue in failed_queues:
- if queue.project_id in project_failed:
- project_failed[queue.project_id].append(queue)
- else:
- project_failed[queue.project_id] = [queue]
-
html = ""
- for project_id, project_name in self.project_list:
- quota = backup_mgt.get_backup_quota(project_id)
+ for project in self.project_list:
+ quota = backup.Backup().get_backup_quota(project["id"])
html += (
"<h3>Project: ${PROJECT}</h3><br>"
@@ -85,33 +91,27 @@
"<h4>${FAILED_VOLUME_LIST}</h4><br>"
)
- for project_id in project_success:
- success_volumes = "<br>".join(
- [
- "Volume ID: %s, Backup ID: %s"
- % (str(e.volume_id), str(e.backup_id))
- for e in project_success[project_id]
- ]
- )
- else:
- success_volumes = "<br>"
- for project_id in project_failed:
- failed_volumes = "<br>".join(
- [
- "Volume ID: %s, Reason: %s"
- % (str(e.volume_id), str(e.reason))
- for e in project_failed[project_id]
- ]
- )
- else:
- failed_volumes = "<br>"
+ success_volumes = "<br>".join(
+ [
+ "Volume ID: %s, Backup ID: %s"
+ % (str(e["volume_id"]), str(e["backup_id"]))
+ for e in self.success_backup_list[project["id"]]
+ ]
+ )
+ failed_volumes = "<br>".join(
+ [
+ "Volume ID: %s, Reason: %s"
+ % (str(e["volume_id"]), str(e["reason"]))
+ for e in self.failed_backup_list[project["id"]]
+ ]
+ )
html = html.replace("${QUOTA_LIMIT}", str(quota["limit"]))
html = html.replace("${QUOTA_IN_USE}", str(quota["in_use"]))
html = html.replace("${QUOTA_RESERVED}", str(quota["reserved"]))
html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)
html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)
- html = html.replace("${PROJECT}", project_name)
+ html = html.replace("${PROJECT}", project["name"])
if html == "":
return
self.content += html
diff --git a/staffeln/db/sqlalchemy/alembic/README b/staffeln/db/sqlalchemy/alembic/README
index d6405b1..42fcedd 100644
--- a/staffeln/db/sqlalchemy/alembic/README
+++ b/staffeln/db/sqlalchemy/alembic/README
@@ -1,5 +1,5 @@
Please see https://alembic.readthedocs.org/en/latest/index.html for general documentation
-Upgrade can be performed by:
+Upgrade can be performed by:
$ staffeln-dbmanage upgrade
$ staffeln-dbmanage upgrade head
diff --git a/staffeln/db/sqlalchemy/alembic/script.py.mako b/staffeln/db/sqlalchemy/alembic/scriptpy.mako
similarity index 100%
rename from staffeln/db/sqlalchemy/alembic/script.py.mako
rename to staffeln/db/sqlalchemy/alembic/scriptpy.mako
diff --git a/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py b/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py
deleted file mode 100644
index 50463e0..0000000
--- a/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py
+++ /dev/null
@@ -1,24 +0,0 @@
-"""Add puller
-
-Revision ID: 003102f08f66
-Revises: 041d9a0f1159
-Create Date: 2022-11-02 06:02:21.404596
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = '003102f08f66'
-down_revision = '041d9a0f1159'
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
- op.create_table(
- 'puller',
- sa.Column('id', sa.Integer(), nullable=False),
- sa.Column('node_id', sa.String(length=100), nullable=True),
- sa.Column('created_at', sa.DateTime(), nullable=True),
- sa.Column('updated_at', sa.DateTime(), nullable=True)
- )
diff --git a/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py b/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
index a16f27c..c6869b2 100644
--- a/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
+++ b/staffeln/db/sqlalchemy/alembic/versions/041d9a0f1159_backup_add_names.py
@@ -1,3 +1,14 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
"""Add volume_name and instance_name to queue_data
Revision ID: 041d9a0f1159
diff --git a/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py b/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
deleted file mode 100644
index 4dc6b03..0000000
--- a/staffeln/db/sqlalchemy/alembic/versions/2b2b9df199bd_add_reason_column_to_queue_data_table.py
+++ /dev/null
@@ -1,21 +0,0 @@
-"""Add reason column to queue_data table
-
-Revision ID: 2b2b9df199bd
-Revises: 003102f08f66
-Create Date: 2022-11-02 06:14:09.348932
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = '2b2b9df199bd'
-down_revision = '003102f08f66'
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
- op.add_column(
- "queue_data",
- sa.Column("reason", sa.String(length=255), nullable=True)
- )
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index a1395ea..adfa7a7 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -335,26 +335,3 @@
return self._soft_delete(models.Backup_data, id)
except: # noqa: E722
LOG.error("Backup Not found.")
-
- def get_puller(self):
- """Get puller"""
- try:
- return self._get(
- context, model=models.Puller, fieldname="id", value=1
- )
- except: # noqa: E722
- LOG.debug("Puller not found.")
- return None
-
- def create_puller(self, values):
- try:
- puller = self._create(models.Puller, values)
- except db_exc.DBDuplicateEntry:
- LOG.error("Puller already exists.")
- return puller
-
- def update_puller(self, id, values):
- try:
- return self._update(models.Puller, id, values)
- except: # noqa: E722
- LOG.error("Puller resource not found.")
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 6ef7211..6c3deb7 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -68,13 +68,3 @@
instance_id = Column(String(100))
volume_name = Column(String(100))
instance_name = Column(String(100))
- reason = Column(String(255), nullable=True)
-
-
-class Puller(Base):
- """Represent OpenStack information Puller"""
-
- __tablename__ = "puller"
- __table_args__ = table_args()
- id = Column(Integer, primary_key=True)
- node_id = Column(String(100), nullable=True)
diff --git a/staffeln/objects/puller.py b/staffeln/objects/puller.py
deleted file mode 100644
index c03d70c..0000000
--- a/staffeln/objects/puller.py
+++ /dev/null
@@ -1,56 +0,0 @@
-from oslo_versionedobjects import fields as ovoo_fields
-
-from staffeln.db import api as db_api
-from staffeln.objects import base
-from staffeln.objects import fields as sfeild
-
-
-@base.StaffelnObjectRegistry.register
-class Puller(
- base.StaffelnPersistentObject, base.StaffelnObject, base.StaffelnObjectDictCompat
-):
- VERSION = "1.0"
-
- dbapi = db_api.get_instance()
-
- fields = {
- "id": sfeild.IntegerField(),
- "node_id": sfeild.UUIDField(),
- "updated_at": ovoo_fields.DateTimeField(),
- }
-
- @base.remotable_classmethod
- def get(cls, context): # pylint: disable=E0213
- """Get puller
- :param context: Security context. NOTE: This should only
- be used internally by the indirection_api.
- Unfortunately, RPC requires context as the first
- argument, even though we don't use it.
- A context should be set when instantiating the
- object, e.g.: Queue(context)
- :returns: a :class:`Puller` object.
- """
- db_puller = cls.dbapi.get_puller()
- if not db_puller:
- return None
- puller = cls._from_db_object(cls(context), db_puller)
- return puller
-
- @base.remotable
- def create(self):
- """Create a :class:`Puller` record in the DB"""
- values = self.obj_get_changes()
- db_puller = self.dbapi.create_puller(values)
- self._from_db_object(self, db_puller)
-
- @base.remotable
- def save(self):
- updates = self.obj_get_changes()
- db_obj = self.dbapi.update_puller(id=1, values=updates)
- obj = self._from_db_object(self, db_obj, eager=False)
- self.obj_refresh(obj)
- self.obj_reset_changes()
-
- def refresh(self):
- obj = self.get()
- self.obj_refresh(obj)
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index 2da75bf..561637c 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -20,7 +20,6 @@
"backup_status": sfeild.IntegerField(),
"volume_name": sfeild.StringField(),
"instance_name": sfeild.StringField(),
- "reason": sfeild.StringField(),
}
@base.remotable_classmethod