Merge branch 'main' into api
diff --git a/.gitignore b/.gitignore
index 73d1eaf..32e35b0 100755
--- a/.gitignore
+++ b/.gitignore
@@ -59,4 +59,13 @@
 
 # Files created by releasenotes build
 releasenotes/build
-.idea/
\ No newline at end of file
+.idea/
+
+# sqlite file
+*.sqlite
+
+# log
+*.log
+
+# envvar openrc file
+*openrc.sh
\ No newline at end of file
diff --git a/etc/staffeln/staffeln.conf b/etc/staffeln/staffeln.conf
index 5a735b2..5b8210c 100644
--- a/etc/staffeln/staffeln.conf
+++ b/etc/staffeln/staffeln.conf
@@ -4,7 +4,7 @@
 
 [database]
 backend = sqlalchemy
-connection = "mysql://root:password@localhost:3306/staffeln"
+connection = "mysql+pymysql://root:password@localhost:3306/staffeln"
 mysql_engine = InnoDB
 # mysql_sql_mode = TRADITIONAL
 # idle_timeout = 3600
diff --git a/requirements.txt b/requirements.txt
index b8a1585..45ae2d5 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -13,5 +13,5 @@
 openstacksdk>0.28.0
 pymysql
 
-email
-smtplib
\ No newline at end of file
+# email
+# smtplib
\ No newline at end of file
diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index d3d0412..26cd159 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -7,7 +7,6 @@
 from staffeln.conductor import manager

 import staffeln.conf

 

-

 CONF = staffeln.conf.CONF

 

 

@@ -15,7 +14,8 @@
     service.prepare_service()

 

     sm = cotyledon.ServiceManager()

-    sm.add(manager.BackupManager, workers=CONF.conductor.backup_workers, args=(CONF,))

+    sm.add(manager.BackupManager,

+           workers=CONF.conductor.backup_workers, args=(CONF,))

     # sm.add(manager.RotationManager,

     #        workers=CONF.conductor.rotation_workers, args=(CONF,))

     oslo_config_glue.setup(sm, CONF)

diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 5818dcd..d0ec6fe 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,3 +1,5 @@
-BACKUP_COMPLETED=2

-BACKUP_WIP=1

-BACKUP_PLANNED=0

+BACKUP_COMPLETED=2
+BACKUP_WIP=1
+BACKUP_PLANNED=0
+
+BACKUP_ENABLED_KEY = 'true'
diff --git a/staffeln/common/notify.py b/staffeln/common/notify.py
deleted file mode 100644
index 19f8440..0000000
--- a/staffeln/common/notify.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Email notification package

-# This should be upgraded by integrating with mail server to send batch

-import smtplib

-from email.mime.text import MIMEText

-from email.mime.multipart import MIMEMultipart

-import staffeln.conf

-

-CONF = staffeln.conf.CONF

-

-

-def sendEmail(src_email, src_pwd, dest_email, subject, content, smtp_server_domain, smtp_server_port):

-    try:

-        message = MIMEMultipart("alternative")

-        message["Subject"] = subject

-        message["From"] = src_email

-        message["To"] = dest_email

-        part = MIMEText(content, "html")

-        message.attach(part)

-

-        s = smtplib.SMTP(host=smtp_server_domain, port=smtp_server_port)

-        s.ehlo()

-        s.starttls()

-        # we can comment this auth func when use the trusted ip without authentication against the smtp server

-        s.login(src_email, src_pwd)

-        s.sendmail(src_email, dest_email, message.as_string())

-        s.close()

-        return True

-    except Exception as e:

-        print(str(e))

-        return False

-

-def SendNotification(content, receiver=None):

-    subject = "Backup result"

-

-    html = "<h3>${CONTENT}</h3>"

-    html = html.replace("${CONTENT}", content)

-

-    if receiver == None:

-        return

-    if len(receiver) == 0:

-        return

-

-    res = sendEmail(src_email=CONF.notification.sender_email,

-                        src_pwd=CONF.notification.sender_pwd,

-                        dest_email=CONF.notification.receiver,

-                        subject=subject,

-                        content=html,

-                        smtp_server_domain=CONF.notification.smtp_server_domain,

-                        smtp_server_port=CONF.notification.smtp_server_port)

diff --git a/staffeln/common/time.py b/staffeln/common/time.py
new file mode 100644
index 0000000..3059085
--- /dev/null
+++ b/staffeln/common/time.py
@@ -0,0 +1,55 @@
+import re
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+
+DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+regex = re.compile(
+    r'((?P<years>\d+?)y)?((?P<months>\d+?)m)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?'
+)
+
+
+# parse_time parses timedelta string to time dict
+# input: <string> 1y2m3w5d - all values should be integer
+# output: <dict> {year: 1, month: 2, week: 3, day: 5}
+def parse_timedelta_string(time_str):
+    empty_flag = True
+    try:
+        parts = regex.match(time_str)
+        if not parts:
+            return None
+        parts = parts.groupdict()
+        time_params = {}
+        for key in parts:
+            if parts[key]:
+                time_params[key] = int(parts[key])
+                empty_flag = False
+            else:
+                time_params[key] = 0
+        if empty_flag: return None
+        return time_params
+    except:
+        return None
+
+
+def get_current_strtime():
+    now = datetime.now()
+    return now.strftime(DEFAULT_TIME_FORMAT)
+
+
+def timeago(years, months, weeks, days, from_date=None):
+    if from_date is None:
+        from_date = datetime.now()
+    return from_date - relativedelta(years=years, months=months, weeks=weeks, days=days)
+
+## yearsago using Standard library
+# def yearsago(years, from_date=None):
+#     if from_date is None:
+#         from_date = datetime.now()
+#     try:
+#         return from_date.replace(year=from_date.year - years)
+#     except ValueError:
+#         # Must be 2/29!
+#         assert from_date.month == 2 and from_date.day == 29 # can be removed
+#         return from_date.replace(month=2, day=28,
+#                                  year=from_date.year-years)
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 0f0522a..f358c8a 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,221 +1,244 @@
-import staffeln.conf

-import collections

-

-from openstack.block_storage.v2 import backup

-from oslo_log import log

-from staffeln.common import auth

-from staffeln.common import context

-from staffeln import objects

-from staffeln.common import short_id

-

-CONF = staffeln.conf.CONF

-LOG = log.getLogger(__name__)

-

-

-BackupMapping = collections.namedtuple(

-    "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]

-)

-

-QueueMapping = collections.namedtuple(

-    "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]

-)

-

-conn = auth.create_connection()

-

-

-def check_vm_backup_metadata(metadata):

-    if not CONF.conductor.backup_metadata_key in metadata:

-        return False

-    return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]

-

-

-def backup_volumes_in_project(conn, project_name):

-    # conn.list_servers()

-    pass

-

-

-def get_projects_list():

-    projects = conn.list_projects()

-    return projects

-

-

-class Backup(object):

-    """Implmentations of the queue with the sql."""

-

-    def __init__(self):

-        self.ctx = context.make_context()

-        self.discovered_queue_map = None

-        self.discovered_backup_map = None

-        self.queue_mapping = dict()

-        self.volume_mapping = dict()

-        self._available_backups = None

-        self._available_backups_map = None

-        self._available_queues = None

-        self._available_queues_map = None

-

-    @property

-    def available_queues(self):

-        """Queues loaded from DB"""

-        if self._available_queues is None:

-            self._available_queues = objects.Queue.list(self.ctx)

-        return self._available_queues

-

-    @property

-    def available_queues_map(self):

-        """Mapping of backup queue loaded from DB"""

-        if self._available_queues_map is None:

-            self._available_queues_map = {

-                QueueMapping(

-                    backup_id=g.backup_id,

-                    volume_id=g.volume_id,

-                    instance_id=g.instance_id,

-                    backup_status=g.backup_status,

-                ): g

-                for g in self.available_queues

-            }

-        return self._available_queues_map

-

-    @property

-    def available_backups(self):

-        """Backups loaded from DB"""

-        if self._available_backups is None:

-            self._available_backups = objects.Volume.list(self.ctx)

-        return self._available_backups

-

-    @property

-    def available_backups_map(self):

-        """Mapping of backup loaded from DB"""

-        if self._available_backups_map is None:

-            self._available_backups_map = {

-                QueueMapping(

-                    backup_id=g.backup_id,

-                    volume_id=g.volume_id,

-                    instance_id=g.instance_id,

-                    backup_completed=g.backup_completed,

-                ): g

-                for g in self.available_queues

-            }

-        return self._available_queues_map

-

-    def get_queues(self, filters=None):

-        """Get the list of volume queue columns from the queue_data table"""

-        queues = objects.Queue.list(self.ctx, filters=filters)

-        return queues

-

-    def create_queue(self):

-        """Create the queue of all the volumes for backup"""

-        self.discovered_queue_map = self.check_instance_volumes()

-        queues_map = self.discovered_queue_map["queues"]

-        for queue_name, queue_map in queues_map.items():

-            self._volume_queue(queue_map)

-

-    def check_instance_volumes(self):

-        """Get the list of all the volumes from the project using openstacksdk

-        Function first list all the servers in the project and get the volumes

-        that are attached to the instance.

-        """

-        queues_map = {}

-        discovered_queue_map = {"queues": queues_map}

-        projects = get_projects_list()

-        for project in projects:

-            servers = conn.compute.servers(

-                details=True, all_projects=True, project_id=project.id

-            )

-            for server in servers:

-                server_id = server.host_id

-                volumes = server.attached_volumes

-                for volume in volumes:

-                    queues_map["queues"] = QueueMapping(

-                        volume_id=volume["id"],

-                        backup_id="NULL",

-                        instance_id=server_id,

-                        backup_status=0,

-                    )

-        return discovered_queue_map

-

-    def _volume_queue(self, queue_map):

-        """Saves the queue data to the database."""

-        volume_id = queue_map.volume_id

-        backup_id = queue_map.backup_id

-        instance_id = queue_map.instance_id

-        backup_status = queue_map.backup_status

-        backup_mapping = dict()

-        matching_backups = [

-            g for g in self.available_queues if g.backup_id == backup_id

-        ]

-        if not matching_backups:

-            volume_queue = objects.Queue(self.ctx)

-            volume_queue.backup_id = backup_id

-            volume_queue.volume_id = volume_id

-            volume_queue.instance_id = instance_id

-            volume_queue.backup_status = backup_status

-            volume_queue.create()

-

-    def volume_backup_initiate(self, queue):

-        """Initiate the backup of the volume

-        :params: queue: Provide the map of the volume that needs

-                  backup.

-        This function will call the backupup api and change the

-        backup_status and backup_id in the queue table.

-        """

-        volume_info = conn.get_volume(queue.volume_id)

-        backup_id = queue.backup_id

-        if backup_id == "NULL":

-            volume_backup = conn.block_storage.create_backup(

-                volume_id=queue.volume_id, force=True

-            )

-            update_queue = objects.Queue.get_by_id(self.ctx, queue.id)

-            update_queue.backup_id = volume_backup.id

-            update_queue.backup_status = 1

-            update_queue.save()

-

-    def check_volume_backup_status(self, queue):

-        """Checks the backup status of the volume

-        :params: queue: Provide the map of the volume that needs backup

-                 status checked.

-        Call the backups api to see if the backup is successful.

-        """

-        for raw in conn.block_storage.backups(volume_id=queue.volume_id):

-            backup_info = raw

-            if backup_info.id == queue.backup_id:

-                if backup_info.status == "error":

-                    LOG.error("Backup of the volume %s failed." % queue.id)

-                    queue_delete = objects.Queue.get_by_id(self.ctx, queue.id)

-                    queue_delete.delete_queue()

-                elif backup_info.status == "success":

-                    backups_map = {}

-                    discovered_backup_map = {"backups": backups_map}

-                    LOG.info("Backup of the volume %s is successful." % queue.volume_id)

-                    backups_map["backups"] = BackupMapping(

-                        volume_id=queue.volume_id,

-                        backup_id=queue.backup_id,

-                        instance_id=queue.instance_id,

-                        backup_completed=1,

-                    )

-                    # Save volume backup success to backup_data table.

-                    self._volume_backup(discovered_backup_map)

-                    ## call db api to remove the queue object.

-                    queue_delete = objects.Queue.get_by_id(self.ctx, queue.id)

-                    queue_delete.delete_queue()

-                else:

-                    pass

-                    ## Wait for the backup to be completed.

-

-    def _volume_backup(self, discovered_backup_map):

-        volumes_map = discovered_backup_map["backups"]

-        for volume_name, volume_map in volumes_map.items():

-            volume_id = volume_map.volume_id

-            backup_id = volume_map.backup_id

-            instance_id = volume_map.instance_id

-            backup_completed = volume_map.backup_completed

-            backup_mapping = dict()

-            matching_backups = [

-                g for g in self.available_backups if g.backup_id == backup_id

-            ]

-            if not matching_backups:

-                volume_backup = objects.Volume(self.ctx)

-                volume_backup.backup_id = backup_id

-                volume_backup.volume_id = volume_id

-                volume_backup.instance_id = instance_id

-                volume_backup.backup_completed = backup_completed

-                volume_backup.create()

+import staffeln.conf
+import collections
+from staffeln.common import constants
+
+from openstack.exceptions import ResourceNotFound as OpenstackResourceNotFound
+from openstack.exceptions import SDKException as OpenstackSDKException
+from oslo_log import log
+from staffeln.common import auth
+from staffeln.common import context
+from staffeln import objects
+from staffeln.i18n import _
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+BackupMapping = collections.namedtuple(
+    "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+)
+
+QueueMapping = collections.namedtuple(
+    "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+)
+
+conn = auth.create_connection()
+
+
+def check_vm_backup_metadata(metadata):
+    if not CONF.conductor.backup_metadata_key in metadata:
+        return False
+    return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+
+
+def get_projects_list():
+    projects = conn.list_projects()
+    return projects
+
+
+class Backup(object):
+    """Implmentations of the queue with the sql."""
+
+    def __init__(self):
+        self.ctx = context.make_context()
+        self.discovered_backup_map = None
+        self.queue_mapping = dict()
+        self.volume_mapping = dict()
+
+    def get_backups(self, filters=None):
+        return objects.Volume.list(self.ctx, filters=filters)
+
+    def get_queues(self, filters=None):
+        """Get the list of volume queue columns from the queue_data table"""
+        queues = objects.Queue.list(self.ctx, filters=filters)
+        return queues
+
+    def create_queue(self, old_tasks):
+        """Create the queue of all the volumes for backup"""
+        # 1. get the old task list, not finished in the last cycle
+        #  and keep till now
+        old_task_volume_list = []
+        for old_task in old_tasks:
+            old_task_volume_list.append(old_task.volume_id)
+
+        # 2. add new tasks in the queue which are not existing in the old task list
+        queue_list = self.check_instance_volumes()
+        for queue in queue_list:
+            if not queue.volume_id in old_task_volume_list:
+                self._volume_queue(queue)
+
+    # Backup the volumes attached to which has a specific metadata
+    def filter_server(self, metadata):
+
+        if not CONF.conductor.backup_metadata_key in metadata:
+            return False
+
+        return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+
+    # Backup the volumes in in-use and available status
+    def filter_volume(self, volume_id):
+        try:
+            volume = conn.get_volume_by_id(volume_id)
+            if volume == None: return False
+            res = volume['status'] in ("available", "in-use")
+            if not res:
+                LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+            return res
+
+        except OpenstackResourceNotFound:
+            return False
+
+    def remove_volume_backup(self, backup_object):
+        try:
+            backup = conn.get_volume_backup(backup_object.backup_id)
+            if backup == None: return False
+            if backup["status"] in ("available"):
+                conn.delete_volume_backup(backup_object.backup_id)
+                backup_object.delete_backup()
+            elif backup["status"] in ("error", "error_restoring"):
+                # TODO(Alex): need to discuss
+                #  now if backup is in error status, then retention service
+                #  does not remove it from openstack but removes it from the
+                #  backup table so user can delete it on Horizon.
+                backup_object.delete_backup()
+            else:  # "deleting", "restoring"
+                LOG.info(_("Rotation for the backup %s is skipped in this cycle "
+                           "because it is in %s status") % (backup_object.backup_id, backup["status"]))
+
+        except OpenstackResourceNotFound:
+            LOG.info(_("Backup %s is not existing in Openstack."
+                       "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+            # remove from the backup table
+            backup_object.delete_backup()
+            return False
+
+    def check_instance_volumes(self):
+        """Get the list of all the volumes from the project using openstacksdk
+        Function first list all the servers in the project and get the volumes
+        that are attached to the instance.
+        """
+        queues_map = []
+        projects = get_projects_list()
+        for project in projects:
+            servers = conn.compute.servers(
+                details=True, all_projects=True, project_id=project.id
+            )
+            for server in servers:
+                if not self.filter_server(server.metadata): continue
+                for volume in server.attached_volumes:
+                    if not self.filter_volume(volume["id"]): continue
+                    queues_map.append(
+                        QueueMapping(
+                            volume_id=volume["id"],
+                            backup_id="NULL",
+                            instance_id=server.id,
+                            backup_status=constants.BACKUP_PLANNED,
+                        )
+                    )
+        return queues_map
+
+    def _volume_queue(self, task):
+        """Saves the queue data to the database."""
+
+        # TODO(Alex): Need to escalate discussion
+        # When create the task list, need to check the WIP backup generators
+        # which are created in the past backup cycle.
+        # Then skip to create new tasks for the volumes whose backup is WIP
+        volume_queue = objects.Queue(self.ctx)
+        volume_queue.backup_id = task.backup_id
+        volume_queue.volume_id = task.volume_id
+        volume_queue.instance_id = task.instance_id
+        volume_queue.backup_status = task.backup_status
+        volume_queue.create()
+
+    def create_volume_backup(self, queue):
+        """Initiate the backup of the volume
+        :params: queue: Provide the map of the volume that needs
+                  backup.
+        This function will call the backupup api and change the
+        backup_status and backup_id in the queue table.
+        """
+        backup_id = queue.backup_id
+        if backup_id == "NULL":
+            try:
+                volume_backup = conn.create_volume_backup(
+                    volume_id=queue.volume_id, force=True
+                )
+                queue.backup_id = volume_backup.id
+                queue.backup_status = constants.BACKUP_WIP
+                queue.save()
+            except OpenstackSDKException as error:
+                LOG.info(_("Backup creation for the volume %s failled. %s"
+                           % (queue.volume_id, str(error))))
+        else:
+            pass
+            # TODO(Alex): remove this task from the task list
+            #  Backup planned task cannot have backup_id in the same cycle
+            #  Reserve for now because it is related to the WIP backup genenrators which
+            #  are not finished in the current cycle
+
+    def process_failed_backup(self, task):
+        # 1. TODO(Alex): notify via email
+        LOG.error("Backup of the volume %s failed." % task.volume_id)
+        # 2. TODO(Alex): remove failed backup instance from the openstack
+        #     then set the volume status in-use
+        # 3. remove failed task from the task queue
+        task.delete_queue()
+
+    def process_available_backup(self, task):
+        LOG.info("Backup of the volume %s is successful." % task.volume_id)
+        # 1. save success backup in the backup table
+        self._volume_backup(
+            BackupMapping(
+                volume_id=task.volume_id,
+                backup_id=task.backup_id,
+                instance_id=task.instance_id,
+                backup_completed=1,
+            )
+        )
+        # 2. remove from the task list
+        task.delete_queue()
+        # 3. TODO(Alex): notify via email
+
+    def process_using_backup(self, task):
+        # remove from the task list
+        task.delete_queue()
+
+    def check_volume_backup_status(self, queue):
+        """Checks the backup status of the volume
+        :params: queue: Provide the map of the volume that needs backup
+                 status checked.
+        Call the backups api to see if the backup is successful.
+        """
+        # for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+        try:
+            backup_gen = conn.get_volume_backup(queue.backup_id)
+            if backup_gen == None:
+                # TODO(Alex): need to check when it is none
+                LOG.info(_("Backup status of %s is returning none."%(queue.backup_id)))
+                return
+            if backup_gen.status == "error":
+                self.process_failed_backup(queue)
+            elif backup_gen.status == "available":
+                self.process_available_backup(queue)
+            elif backup_gen.status == "creating":
+                # TODO(Alex): Need to escalate discussion
+                # How to proceed WIP bakcup generators?
+                # To make things worse, the last backup generator is in progress till
+                # the new backup cycle
+                LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+            else:  # "deleting", "restoring", "error_restoring" status
+                self.process_using_backup(queue)
+        except OpenstackResourceNotFound as e:
+            self.process_failed_backup(queue)
+
+    def _volume_backup(self, task):
+        # matching_backups = [
+        #     g for g in self.available_backups if g.backup_id == task.backup_id
+        # ]
+        # if not matching_backups:
+        volume_backup = objects.Volume(self.ctx)
+        volume_backup.backup_id = task.backup_id
+        volume_backup.volume_id = task.volume_id
+        volume_backup.instance_id = task.instance_id
+        volume_backup.backup_completed = task.backup_completed
+        volume_backup.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 7f12246..6898348 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,99 +1,174 @@
-import cotyledon
-from futurist import periodics
-from oslo_log import log
-import staffeln.conf
-import sys
-import threading
-import time
-
-from staffeln.common import constants
-from staffeln.conductor import backup
-from staffeln.common import context
-
-
-LOG = log.getLogger(__name__)
-CONF = staffeln.conf.CONF
-
-
-class BackupManager(cotyledon.Service):
-    name = "Staffeln conductor backup controller"
-
-    def __init__(self, worker_id, conf):
-        super(BackupManager, self).__init__(worker_id)
-        self._shutdown = threading.Event()
-        self.conf = conf
-        self.ctx = context.make_context()
-        LOG.info("%s init" % self.name)
-
-    def run(self):
-        LOG.info("%s run" % self.name)
-        periodic_callables = [
-            (self.backup_engine, (), {}),
-        ]
-        periodic_worker = periodics.PeriodicWorker(periodic_callables)
-        periodic_thread = threading.Thread(target=periodic_worker.start)
-        periodic_thread.daemon = True
-        periodic_thread.start()
-
-    def terminate(self):
-        LOG.info("%s terminate" % self.name)
-        super(BackupManager, self).terminate()
-
-    def reload(self):
-        LOG.info("%s reload" % self.name)
-
-    @periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
-    def backup_engine(self):
-        LOG.info("backing... %s" % str(time.time()))
-        LOG.info("%s periodics" % self.name)
-        queue = backup.Backup().get_queues()
-        queues_to_start = backup.Backup().get_queues(
-            filters={"backup_status": constants.BACKUP_PLANNED}
-        )
-        queues_started = backup.Backup().get_queues(
-            filters={"backup_status": constants.BACKUP_WIP}
-        )
-        if len(queue) == 0:
-            create_queue = backup.Backup().create_queue()
-        elif len(queues_started) != 0:
-            for queue in queues_started:
-                LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
-                backup_volume = backup.Backup().check_volume_backup_status(queue)
-        elif len(queues_to_start) != 0:
-            for queue in queues_to_start:
-                LOG.info("Started backup process for %s" % queue.volume_id)
-                backup_volume = backup.Backup().volume_backup_initiate(queue)
-
-
-class RotationManager(cotyledon.Service):
-    name = "Staffeln conductor rotation controller"
-
-    def __init__(self, worker_id, conf):
-        super(RotationManager, self).__init__(worker_id)
-        self._shutdown = threading.Event()
-        self.conf = conf
-        LOG.info("%s init" % self.name)
-
-    def run(self):
-        LOG.info("%s run" % self.name)
-        interval = CONF.conductor.rotation_period
-
-        periodic_callables = [
-            (self.rotation_engine, (), {}),
-        ]
-        periodic_worker = periodics.PeriodicWorker(periodic_callables)
-        periodic_thread = threading.Thread(target=periodic_worker.start)
-        periodic_thread.daemon = True
-        periodic_thread.start()
-
-    def terminate(self):
-        LOG.info("%s terminate" % self.name)
-        super(RotationManager, self).terminate()
-
-    def reload(self):
-        LOG.info("%s reload" % self.name)
-
-    @periodics.periodic(spacing=CONF.conductor.rotation_period, run_immediately=True)
-    def rotation_engine(self):
-        print("rotating... %s" % str(time.time()))
-        LOG.info("%s rotation_engine" % self.name)
+import cotyledon

+import datetime

+from futurist import periodics

+from oslo_log import log

+import staffeln.conf

+import threading

+import time

+

+from staffeln.common import constants

+from staffeln.common import context

+from staffeln.common import time as xtime

+from staffeln.conductor import backup

+from staffeln.conductor import notify

+from staffeln.i18n import _

+

+LOG = log.getLogger(__name__)

+CONF = staffeln.conf.CONF

+

+

+class BackupManager(cotyledon.Service):

+    name = "Staffeln conductor backup controller"

+

+    def __init__(self, worker_id, conf):

+        super(BackupManager, self).__init__(worker_id)

+        self._shutdown = threading.Event()

+        self.conf = conf

+        self.ctx = context.make_context()

+        LOG.info("%s init" % self.name)

+

+    def run(self):

+        LOG.info("%s run" % self.name)

+        periodic_callables = [

+            (self.backup_engine, (), {}),

+        ]

+        periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")

+        periodic_thread = threading.Thread(target=periodic_worker.start)

+        periodic_thread.daemon = True

+        periodic_thread.start()

+

+    def terminate(self):

+        LOG.info("%s terminate" % self.name)

+        super(BackupManager, self).terminate()

+

+    def reload(self):

+        LOG.info("%s reload" % self.name)

+

+    # Check if the backup count is over the limit

+    # TODO(Alex): how to count the backup number

+    #  only available backups are calculated?

+    def _check_quota(self):

+        LOG.info(_("Checking the backup limitation..."))

+        max_count = CONF.conductor.max_backup_count

+        current_count = len(backup.Backup().get_backups())

+        if max_count <= current_count:

+            # TODO(Alex): Send notification

+            LOG.info(_("The backup limit is over."))

+            return True

+        LOG.info(_("The max limit is %s, and current backup count is %s" % (max_count, current_count)))

+        return False

+

+    # Manage active backup generators

+    # TODO(Alex): need to discuss

+    #  Need to wait until all backups are finished?

+    #  That is required to make the backup report

+    def _process_wip_tasks(self):

+        LOG.info(_("Processing WIP backup generators..."))

+        queues_started = backup.Backup().get_queues(

+            filters={"backup_status": constants.BACKUP_WIP}

+        )

+        if len(queues_started) != 0:

+            for queue in queues_started: backup.Backup().check_volume_backup_status(queue)

+

+    # Create backup generators

+    def _process_todo_tasks(self):

+        LOG.info(_("Creating new backup generators..."))

+        queues_to_start = backup.Backup().get_queues(

+            filters={"backup_status": constants.BACKUP_PLANNED}

+        )

+        if len(queues_to_start) != 0:

+            for queue in queues_to_start:

+                backup.Backup().create_volume_backup(queue)

+

+    # Refresh the task queue

+    def _update_task_queue(self):

+        LOG.info(_("Updating backup task queue..."))

+        current_tasks = backup.Backup().get_queues()

+        backup.Backup().create_queue(current_tasks)

+

+    def _report_backup_result(self):

+        # TODO(Alex): Need to update these list

+        self.success_backup_list = []

+        self.failed_backup_list = []

+        notify.SendBackupResultEmail(self.success_backup_list, self.failed_backup_list)

+

+

+    @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)

+    def backup_engine(self):

+        LOG.info("backing... %s" % str(time.time()))

+        LOG.info("%s periodics" % self.name)

+

+        if self._check_quota(): return

+        # NOTE(Alex): If _process_wip_tasks() waits tiil no WIP tasks

+        # exist, no need to repeat this function before and after queue update.

+        self._process_wip_tasks()

+        self._update_task_queue()

+        self._process_todo_tasks()

+        self._process_wip_tasks()

+        self._report_backup_result()

+

+

+

+class RotationManager(cotyledon.Service):

+    name = "Staffeln conductor rotation controller"

+

+    def __init__(self, worker_id, conf):

+        super(RotationManager, self).__init__(worker_id)

+        self._shutdown = threading.Event()

+        self.conf = conf

+        LOG.info("%s init" % self.name)

+

+    def run(self):

+        LOG.info("%s run" % self.name)

+

+        periodic_callables = [

+            (self.rotation_engine, (), {}),

+        ]

+        periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")

+        periodic_thread = threading.Thread(target=periodic_worker.start)

+        periodic_thread.daemon = True

+        periodic_thread.start()

+

+    def terminate(self):

+        LOG.info("%s terminate" % self.name)

+        super(RotationManager, self).terminate()

+

+    def reload(self):

+        LOG.info("%s reload" % self.name)

+

+    def get_backup_list(self):

+        threshold_strtime = self.get_threshold_strtime()

+        if threshold_strtime == None: return False

+        self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})

+        return True

+

+    def remove_backups(self):

+        print(self.backup_list)

+        for retention_backup in self.backup_list:

+            # 1. check the backup status and delete only available backups

+            backup.Backup().remove_volume_backup(retention_backup)

+

+    @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)

+    def rotation_engine(self):

+        LOG.info("%s rotation_engine" % self.name)

+        # 1. get the list of backups to remove based on the retention time

+        if not self.get_backup_list(): return

+

+        # 2. remove the backups

+        self.remove_backups()

+

+    # get the threshold time str

+    def get_threshold_strtime(self):

+        time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)

+        if time_delta_dict == None: return None

+

+        res = xtime.timeago(

+            years=time_delta_dict["years"],

+            months=time_delta_dict["months"],

+            weeks=time_delta_dict["weeks"],

+            days=time_delta_dict["days"],

+        )

+        if res == None: LOG.info(_("Retention time format is invalid. "

+                                   "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))

+

+        return res.strftime(xtime.DEFAULT_TIME_FORMAT)

diff --git a/staffeln/conductor/notify.py b/staffeln/conductor/notify.py
new file mode 100644
index 0000000..d80da4e
--- /dev/null
+++ b/staffeln/conductor/notify.py
@@ -0,0 +1,57 @@
+# Email notification package

+# This should be upgraded by integrating with mail server to send batch

+import smtplib

+from email.mime.text import MIMEText

+from email.mime.multipart import MIMEMultipart

+from oslo_log import log

+import staffeln.conf

+from staffeln.common import time as xtime

+from staffeln.i18n import _

+

+CONF = staffeln.conf.CONF

+LOG = log.getLogger(__name__)

+

+

+def _sendEmail(src_email, src_pwd, dest_email, subject, content, smtp_server_domain, smtp_server_port):

+    message = MIMEMultipart("alternative")

+    message["Subject"] = subject

+    message["From"] = src_email

+    message["To"] = dest_email

+    part = MIMEText(content, "html")

+    message.attach(part)

+

+    s = smtplib.SMTP(host=smtp_server_domain, port=smtp_server_port)

+    s.ehlo()

+    s.starttls()

+    # we can comment this auth func when use the trusted ip without authentication against the smtp server

+    s.login(src_email, src_pwd)

+    s.sendmail(src_email, dest_email, message.as_string())

+    s.close()

+

+

+def SendBackupResultEmail(success_backup_list, failed_backup_list):

+    subject = "Backup result"

+

+    html = "<h3>${TIME}</h3>" \

+           "<h3>Success List</h3>" \

+           "<h4>${SUCCESS_VOLUME_LIST}</h4>" \

+           "<h3>Failed List</h3>" \

+           "<h4>${FAILED_VOLUME_LIST}</h4>"

+

+    success_volumes = '<br>'.join([str(elem) for elem in success_backup_list])

+    failed_volumes = '<br>'.join([str(elem) for elem in failed_backup_list])

+    html = html.replace("${TIME}", xtime.get_current_strtime())

+    html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)

+    html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)

+    try:

+        _sendEmail(src_email=CONF.notification.sender_email,

+                   src_pwd=CONF.notification.sender_pwd,

+                   dest_email=CONF.notification.receiver,

+                   subject=subject,

+                   content=html,

+                   smtp_server_domain=CONF.notification.smtp_server_domain,

+                   smtp_server_port=CONF.notification.smtp_server_port)

+        LOG.info(_("Backup result email sent"))

+    except Exception as e:

+        LOG.error(_("Backup result email send failed. Please check email configuration. %s" % (str(e))))

+

diff --git a/staffeln/conf/api.py b/staffeln/conf/api.py
index 846f959..e405d6a 100755
--- a/staffeln/conf/api.py
+++ b/staffeln/conf/api.py
@@ -1,7 +1,6 @@
 from oslo_config import cfg
 from staffeln.i18n import _
 
-
 api_group = cfg.OptGroup(
     "api",
     title="API options",
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 9906004..4e2aad9 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,7 +1,6 @@
 from oslo_config import cfg
 from staffeln.i18n import _
 
-
 conductor_group = cfg.OptGroup(
     "conductor",
     title="Conductor Options",
@@ -13,17 +12,22 @@
         "backup_workers",
         default=1,
         help=_("The maximum number of backup processes to "
-        "fork and run. Default to number of CPUs on the host."),
+               "fork and run. Default to number of CPUs on the host."),
     ),
     cfg.IntOpt(
-        "backup_period",
-        default=10,
-        min=1,
+        "backup_service_period",
+        default=60,
+        min=10,
         help=_("The time of bakup period, the unit is one minute."),
     ),
     cfg.StrOpt(
         "backup_metadata_key",
-        default="test",
+        default="__automated_backup",
+        help=_("The key string of metadata the VM, which requres back up, has"),
+    ),
+    cfg.IntOpt(
+        "max_backup_count",
+        default=10,
         help=_("The key string of metadata the VM, which requres back up, has"),
     ),
 ]
@@ -33,13 +37,25 @@
         "rotation_workers",
         default=1,
         help=_("The maximum number of rotation processes to "
-        "fork and run. Default to number of CPUs on the host."),
+               "fork and run. Default to number of CPUs on the host."),
     ),
     cfg.IntOpt(
-        "rotation_period",
+        "retention_service_period",
+        default=20,
+        min=10,
+        help=_("The period of the retention service, the unit is one second."),
+    ),
+    cfg.IntOpt(
+        "rotation_workers",
         default=1,
-        min=1,
-        help=_("The time of rotation period, the unit is one day."),
+        help=_("The maximum number of rotation processes to "
+               "fork and run. Default to number of CPUs on the host."),
+    ),
+    cfg.StrOpt(
+        "retention_time",
+        default="2w3d",
+        help=_("The time of retention period, the for mat is "
+               "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
     ),
 ]
 
diff --git a/staffeln/conf/notify.py b/staffeln/conf/notify.py
index 20f1ede..c292e51 100644
--- a/staffeln/conf/notify.py
+++ b/staffeln/conf/notify.py
@@ -9,11 +9,6 @@
 )
 
 email_opts = [
-    cfg.StrOpt(
-        "template",
-        default="<h3>${CONTENT}</h3>",
-        help=_("This html template is used to email the backup result."),
-    ),
     cfg.ListOpt(
         "receiver",
         default=[],
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 41cd483..4b4fc9f 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -109,13 +109,8 @@
         if filters is None:
             filters = {}
 
-        plain_fields = [
-            "volume_id",
-            "backup_id",
-            "backup_completed",
-            "instance_id",
-            "created_at",
-        ]
+
+        plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
 
         return self._add_filters(
             query=query,
@@ -314,6 +309,7 @@
         except:
             LOG.error("Queue Not found.")
 
+
     def get_backup_by_backup_id(self, context, backup_id):
         """Get the column from the backup_data with matching backup_id"""
 
@@ -332,6 +328,7 @@
         except:
             LOG.error("Backup resource not found.")
 
+
     def soft_delete_backup(self, id):
         try:
             return self._soft_delete(models.Backup_data, id)
diff --git a/staffeln/objects/__init__.py b/staffeln/objects/__init__.py
index 8003db1..4f7ca56 100755
--- a/staffeln/objects/__init__.py
+++ b/staffeln/objects/__init__.py
@@ -1,4 +1,5 @@
 from .queue import Queue
+from .volume import Volume
 # from volume import Volume
 def register_all():
     __import__('staffeln.objects.volume')
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 1a6c09d..6655b7b 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -82,3 +82,4 @@
         else:
             backup = cls._from_db_object(cls(context), db_backup)
             return backup
+          
\ No newline at end of file