Add openstack resource puller

Add puller in BackupManager, DB and Object to keep only one node to pull
volume-server attachment information from OpenStack.
To have multiple puller is okay, but with this, we can reduce unnecessary API
calls to OpenStack.
diff --git a/requirements.txt b/requirements.txt
index 1b57644..bc61474 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,8 +11,9 @@
 oslo.config>=8.1.0
 oslo.log>=4.4.0 # Apache-2.0
 oslo_versionedobjects
+oslo.utils>=3.40.0 # Apache-2.0
 openstacksdk>0.28.0
 pymysql
 parse
 # email
-# smtplib
\ No newline at end of file
+# smtplib
diff --git a/staffeln/common/node_manage.py b/staffeln/common/node_manage.py
new file mode 100644
index 0000000..16a2575
--- /dev/null
+++ b/staffeln/common/node_manage.py
@@ -0,0 +1,59 @@
+from datetime import datetime, timezone
+
+from oslo_utils import uuidutils
+
+import staffeln.conf
+from staffeln.objects import puller
+
+CONF = staffeln.conf.CONF
+
+
+class Puller(object):
+    def __init__(self, context, node_id=None):
+        self.ctx = context
+
+        self.node_id = (
+            uuidutils.generate_uuid() if node_id is None else node_id
+        )
+        self.puller = None
+
+    def fetch_puller_role(self):
+        target_puller = puller.Puller.get(context=self.ctx)
+
+        # No puller, run for puller role
+        if not target_puller:
+            self.puller = puller.Puller(self.ctx)
+            self.puller.node_id = self.node_id
+            self.puller.updated_at = datetime.now(timezone.utc)
+            self.puller.create()
+            return True
+        # If puller expired, run for new puller role
+        elif self.is_old_puller(target_puller):
+            self.puller = puller.Puller(self.ctx)
+            self.puller.node_id = self.node_id
+            self.puller.updated_at = datetime.now(timezone.utc)
+            self.puller.save()
+            return True
+        else:
+            return False
+
+        self.puller = puller.Puller.get(context=self.ctx)
+        # Return True if this new puller's node_id is this node.
+        return self.puller.node_id == self.node_id
+
+    def is_old_puller(self, target_puller):
+        valid_period = CONF.conductor.backup_service_period * 2
+        # Check if puller have not been update for more than two
+        # backup_service_period.
+        return True if (
+            datetime.now(timezone.utc) - target_puller.updated_at
+        ).total_seconds() > valid_period else False
+
+    def renew_update_time(self):
+        if self.puller is None:
+            return
+        self.puller = puller.Puller.get(context=self.ctx)
+
+        if self.puller.node_id == self.node_id:
+            self.puller.updated_at = datetime.now(timezone.utc)
+            self.puller.save()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index a6e93a9..0642c07 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -6,6 +6,7 @@
 from futurist import periodics

 from oslo_log import log

 from staffeln.common import constants, context

+from staffeln.common import node_manage

 from staffeln.common import time as xtime

 from staffeln.conductor import backup

 from staffeln.i18n import _

@@ -23,7 +24,8 @@
         self.conf = conf

         self.ctx = context.make_context()

         self.controller = backup.Backup()

-        LOG.info("%s init" % self.name)

+        self.puller = node_manage.Puller(self.ctx)

+        LOG.info("%s init (node_id: %s)" % (self.name, self.puller.node_id))

 

     def run(self):

         LOG.info("%s run" % self.name)

@@ -120,7 +122,10 @@
 

         @periodics.periodic(spacing=backup_service_period, run_immediately=True)

         def backup_tasks():

-            self._update_task_queue()

+            #TODO should switch to using `with`

+            if self.puller.fetch_puller_role():

+                self._update_task_queue()

+                self.puller.renew_update_time()

             self._process_todo_tasks()

             self._process_wip_tasks()

             self._report_backup_result()

diff --git a/staffeln/db/sqlalchemy/alembic/README b/staffeln/db/sqlalchemy/alembic/README
index 42fcedd..d6405b1 100644
--- a/staffeln/db/sqlalchemy/alembic/README
+++ b/staffeln/db/sqlalchemy/alembic/README
@@ -1,5 +1,5 @@
 Please see https://alembic.readthedocs.org/en/latest/index.html for general documentation
 
-Upgrade can be performed by: 
+Upgrade can be performed by:
 $ staffeln-dbmanage upgrade
 $ staffeln-dbmanage upgrade head
diff --git a/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py b/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py
new file mode 100644
index 0000000..50463e0
--- /dev/null
+++ b/staffeln/db/sqlalchemy/alembic/versions/003102f08f66_add_puller.py
@@ -0,0 +1,24 @@
+"""Add puller
+
+Revision ID: 003102f08f66
+Revises: 041d9a0f1159
+Create Date: 2022-11-02 06:02:21.404596
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '003102f08f66'
+down_revision = '041d9a0f1159'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.create_table(
+        'puller',
+        sa.Column('id', sa.Integer(), nullable=False),
+        sa.Column('node_id', sa.String(length=100), nullable=True),
+        sa.Column('created_at', sa.DateTime(), nullable=True),
+        sa.Column('updated_at', sa.DateTime(), nullable=True)
+    )
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index adfa7a7..a1395ea 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -335,3 +335,26 @@
             return self._soft_delete(models.Backup_data, id)
         except:  # noqa: E722
             LOG.error("Backup Not found.")
+
+    def get_puller(self):
+        """Get puller"""
+        try:
+            return self._get(
+                context, model=models.Puller, fieldname="id", value=1
+            )
+        except:  # noqa: E722
+            LOG.debug("Puller not found.")
+            return None
+
+    def create_puller(self, values):
+        try:
+            puller = self._create(models.Puller, values)
+        except db_exc.DBDuplicateEntry:
+            LOG.error("Puller already exists.")
+        return puller
+
+    def update_puller(self, id, values):
+        try:
+            return self._update(models.Puller, id, values)
+        except:  # noqa: E722
+            LOG.error("Puller resource not found.")
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 6c3deb7..39ac7ee 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -68,3 +68,12 @@
     instance_id = Column(String(100))
     volume_name = Column(String(100))
     instance_name = Column(String(100))
+
+
+class Puller(Base):
+    """Represent OpenStack information Puller"""
+
+    __tablename__ = "puller"
+    __table_args__ = table_args()
+    id = Column(Integer, primary_key=True)
+    node_id = Column(String(100), nullable=True)
diff --git a/staffeln/objects/puller.py b/staffeln/objects/puller.py
new file mode 100644
index 0000000..c03d70c
--- /dev/null
+++ b/staffeln/objects/puller.py
@@ -0,0 +1,56 @@
+from oslo_versionedobjects import fields as ovoo_fields
+
+from staffeln.db import api as db_api
+from staffeln.objects import base
+from staffeln.objects import fields as sfeild
+
+
+@base.StaffelnObjectRegistry.register
+class Puller(
+    base.StaffelnPersistentObject, base.StaffelnObject, base.StaffelnObjectDictCompat
+):
+    VERSION = "1.0"
+
+    dbapi = db_api.get_instance()
+
+    fields = {
+        "id": sfeild.IntegerField(),
+        "node_id": sfeild.UUIDField(),
+        "updated_at": ovoo_fields.DateTimeField(),
+    }
+
+    @base.remotable_classmethod
+    def get(cls, context):  # pylint: disable=E0213
+        """Get puller
+        :param context: Security context. NOTE: This should only
+                        be used internally by the indirection_api.
+                        Unfortunately, RPC requires context as the first
+                        argument, even though we don't use it.
+                        A context should be set when instantiating the
+                        object, e.g.: Queue(context)
+        :returns: a :class:`Puller` object.
+        """
+        db_puller = cls.dbapi.get_puller()
+        if not db_puller:
+            return None
+        puller = cls._from_db_object(cls(context), db_puller)
+        return puller
+
+    @base.remotable
+    def create(self):
+        """Create a :class:`Puller` record in the DB"""
+        values = self.obj_get_changes()
+        db_puller = self.dbapi.create_puller(values)
+        self._from_db_object(self, db_puller)
+
+    @base.remotable
+    def save(self):
+        updates = self.obj_get_changes()
+        db_obj = self.dbapi.update_puller(id=1, values=updates)
+        obj = self._from_db_object(self, db_obj, eager=False)
+        self.obj_refresh(obj)
+        self.obj_reset_changes()
+
+    def refresh(self):
+        obj = self.get()
+        self.obj_refresh(obj)