Add openstack resource puller
Add puller in BackupManager, DB and Object to keep only one node to pull
volume-server attachment information from OpenStack.
To have multiple puller is okay, but with this, we can reduce unnecessary API
calls to OpenStack.
diff --git a/requirements.txt b/requirements.txt
index 1b57644..bc61474 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,8 +11,9 @@
oslo.config>=8.1.0
oslo.log>=4.4.0 # Apache-2.0
oslo_versionedobjects
+oslo.utils>=3.40.0 # Apache-2.0
openstacksdk>0.28.0
pymysql
parse
# email
-# smtplib
\ No newline at end of file
+# smtplib
diff --git a/staffeln/common/node_manage.py b/staffeln/common/node_manage.py
new file mode 100644
index 0000000..16a2575
--- /dev/null
+++ b/staffeln/common/node_manage.py
@@ -0,0 +1,59 @@
+from datetime import datetime, timezone
+
+from oslo_utils import uuidutils
+
+import staffeln.conf
+from staffeln.objects import puller
+
+CONF = staffeln.conf.CONF
+
+
+class Puller(object):
+ def __init__(self, context, node_id=None):
+ self.ctx = context
+
+ self.node_id = (
+ uuidutils.generate_uuid() if node_id is None else node_id
+ )
+ self.puller = None
+
+ def fetch_puller_role(self):
+ target_puller = puller.Puller.get(context=self.ctx)
+
+ # No puller, run for puller role
+ if not target_puller:
+ self.puller = puller.Puller(self.ctx)
+ self.puller.node_id = self.node_id
+ self.puller.updated_at = datetime.now(timezone.utc)
+ self.puller.create()
+ return True
+ # If puller expired, run for new puller role
+ elif self.is_old_puller(target_puller):
+ self.puller = puller.Puller(self.ctx)
+ self.puller.node_id = self.node_id
+ self.puller.updated_at = datetime.now(timezone.utc)
+ self.puller.save()
+ return True
+ else:
+ return False
+
+ self.puller = puller.Puller.get(context=self.ctx)
+ # Return True if this new puller's node_id is this node.
+ return self.puller.node_id == self.node_id
+
+ def is_old_puller(self, target_puller):
+ valid_period = CONF.conductor.backup_service_period * 2
+ # Check if puller have not been update for more than two
+ # backup_service_period.
+ return True if (
+ datetime.now(timezone.utc) - target_puller.updated_at
+ ).total_seconds() > valid_period else False
+
+ def renew_update_time(self):
+ if self.puller is None:
+ return
+ self.puller = puller.Puller.get(context=self.ctx)
+
+ if self.puller.node_id == self.node_id:
+ self.puller.updated_at = datetime.now(timezone.utc)
+ self.puller.save()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index a6e93a9..0642c07 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -6,6 +6,7 @@
from futurist import periodics
from oslo_log import log
from staffeln.common import constants, context
+from staffeln.common import node_manage
from staffeln.common import time as xtime
from staffeln.conductor import backup
from staffeln.i18n import _
@@ -23,7 +24,8 @@
self.conf = conf
self.ctx = context.make_context()
self.controller = backup.Backup()
- LOG.info("%s init" % self.name)
+ self.puller = node_manage.Puller(self.ctx)
+ LOG.info("%s init (node_id: %s)" % (self.name, self.puller.node_id))
def run(self):
LOG.info("%s run" % self.name)
@@ -120,7 +122,10 @@
@periodics.periodic(spacing=backup_service_period, run_immediately=True)
def backup_tasks():
- self._update_task_queue()
+ #TODO should switch to using `with`
+ if self.puller.fetch_puller_role():
+ self._update_task_queue()
+ self.puller.renew_update_time()
self._process_todo_tasks()
self._process_wip_tasks()
self._report_backup_result()
diff --git a/staffeln/db/sqlalchemy/alembic/README b/staffeln/db/sqlalchemy/alembic/README
index 42fcedd..d6405b1 100644
--- a/staffeln/db/sqlalchemy/alembic/README
+++ b/staffeln/db/sqlalchemy/alembic/README
@@ -1,5 +1,5 @@
Please see https://alembic.readthedocs.org/en/latest/index.html for general documentation
-Upgrade can be performed by:
+Upgrade can be performed by:
$ staffeln-dbmanage upgrade
$ staffeln-dbmanage upgrade head
diff --git a/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py b/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py
new file mode 100644
index 0000000..50463e0
--- /dev/null
+++ b/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py
@@ -0,0 +1,24 @@
+"""Add puller
+
+Revision ID: 003102f08f66
+Revises: 041d9a0f1159
+Create Date: 2022-11-02 06:02:21.404596
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '003102f08f66'
+down_revision = '041d9a0f1159'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.create_table(
+ 'puller',
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('node_id', sa.String(length=100), nullable=True),
+ sa.Column('created_at', sa.DateTime(), nullable=True),
+ sa.Column('updated_at', sa.DateTime(), nullable=True)
+ )
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index adfa7a7..a1395ea 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -335,3 +335,26 @@
return self._soft_delete(models.Backup_data, id)
except: # noqa: E722
LOG.error("Backup Not found.")
+
+ def get_puller(self):
+ """Get puller"""
+ try:
+ return self._get(
+ context, model=models.Puller, fieldname="id", value=1
+ )
+ except: # noqa: E722
+ LOG.debug("Puller not found.")
+ return None
+
+ def create_puller(self, values):
+ try:
+ puller = self._create(models.Puller, values)
+ except db_exc.DBDuplicateEntry:
+ LOG.error("Puller already exists.")
+ return puller
+
+ def update_puller(self, id, values):
+ try:
+ return self._update(models.Puller, id, values)
+ except: # noqa: E722
+ LOG.error("Puller resource not found.")
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 6c3deb7..39ac7ee 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -68,3 +68,12 @@
instance_id = Column(String(100))
volume_name = Column(String(100))
instance_name = Column(String(100))
+
+
+class Puller(Base):
+ """Represent OpenStack information Puller"""
+
+ __tablename__ = "puller"
+ __table_args__ = table_args()
+ id = Column(Integer, primary_key=True)
+ node_id = Column(String(100), nullable=True)
diff --git a/staffeln/objects/puller.py b/staffeln/objects/puller.py
new file mode 100644
index 0000000..c03d70c
--- /dev/null
+++ b/staffeln/objects/puller.py
@@ -0,0 +1,56 @@
+from oslo_versionedobjects import fields as ovoo_fields
+
+from staffeln.db import api as db_api
+from staffeln.objects import base
+from staffeln.objects import fields as sfeild
+
+
+@base.StaffelnObjectRegistry.register
+class Puller(
+ base.StaffelnPersistentObject, base.StaffelnObject, base.StaffelnObjectDictCompat
+):
+ VERSION = "1.0"
+
+ dbapi = db_api.get_instance()
+
+ fields = {
+ "id": sfeild.IntegerField(),
+ "node_id": sfeild.UUIDField(),
+ "updated_at": ovoo_fields.DateTimeField(),
+ }
+
+ @base.remotable_classmethod
+ def get(cls, context): # pylint: disable=E0213
+ """Get puller
+ :param context: Security context. NOTE: This should only
+ be used internally by the indirection_api.
+ Unfortunately, RPC requires context as the first
+ argument, even though we don't use it.
+ A context should be set when instantiating the
+ object, e.g.: Queue(context)
+ :returns: a :class:`Puller` object.
+ """
+ db_puller = cls.dbapi.get_puller()
+ if not db_puller:
+ return None
+ puller = cls._from_db_object(cls(context), db_puller)
+ return puller
+
+ @base.remotable
+ def create(self):
+ """Create a :class:`Puller` record in the DB"""
+ values = self.obj_get_changes()
+ db_puller = self.dbapi.create_puller(values)
+ self._from_db_object(self, db_puller)
+
+ @base.remotable
+ def save(self):
+ updates = self.obj_get_changes()
+ db_obj = self.dbapi.update_puller(id=1, values=updates)
+ obj = self._from_db_object(self, db_obj, eager=False)
+ self.obj_refresh(obj)
+ self.obj_reset_changes()
+
+ def refresh(self):
+ obj = self.get()
+ self.obj_refresh(obj)