Merge branch 'main' into api
diff --git a/.gitignore b/.gitignore
index 73d1eaf..32e35b0 100755
--- a/.gitignore
+++ b/.gitignore
@@ -59,4 +59,13 @@
# Files created by releasenotes build
releasenotes/build
-.idea/
\ No newline at end of file
+.idea/
+
+# sqlite file
+*.sqlite
+
+# log
+*.log
+
+# envvar openrc file
+*openrc.sh
\ No newline at end of file
diff --git a/etc/staffeln/staffeln.conf b/etc/staffeln/staffeln.conf
index 5a735b2..5b8210c 100644
--- a/etc/staffeln/staffeln.conf
+++ b/etc/staffeln/staffeln.conf
@@ -4,7 +4,7 @@
[database]
backend = sqlalchemy
-connection = "mysql://root:password@localhost:3306/staffeln"
+connection = "mysql+pymysql://root:password@localhost:3306/staffeln"
mysql_engine = InnoDB
# mysql_sql_mode = TRADITIONAL
# idle_timeout = 3600
diff --git a/requirements.txt b/requirements.txt
index b8a1585..45ae2d5 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -13,5 +13,5 @@
openstacksdk>0.28.0
pymysql
-email
-smtplib
\ No newline at end of file
+# email
+# smtplib
\ No newline at end of file
diff --git a/staffeln/cmd/conductor.py b/staffeln/cmd/conductor.py
index d3d0412..26cd159 100755
--- a/staffeln/cmd/conductor.py
+++ b/staffeln/cmd/conductor.py
@@ -7,7 +7,6 @@
from staffeln.conductor import manager
import staffeln.conf
-
CONF = staffeln.conf.CONF
@@ -15,7 +14,8 @@
service.prepare_service()
sm = cotyledon.ServiceManager()
- sm.add(manager.BackupManager, workers=CONF.conductor.backup_workers, args=(CONF,))
+ sm.add(manager.BackupManager,
+ workers=CONF.conductor.backup_workers, args=(CONF,))
# sm.add(manager.RotationManager,
# workers=CONF.conductor.rotation_workers, args=(CONF,))
oslo_config_glue.setup(sm, CONF)
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index 5818dcd..d0ec6fe 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -1,3 +1,5 @@
-BACKUP_COMPLETED=2
-BACKUP_WIP=1
-BACKUP_PLANNED=0
+BACKUP_COMPLETED=2
+BACKUP_WIP=1
+BACKUP_PLANNED=0
+
+BACKUP_ENABLED_KEY = 'true'
diff --git a/staffeln/common/notify.py b/staffeln/common/notify.py
deleted file mode 100644
index 19f8440..0000000
--- a/staffeln/common/notify.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Email notification package
-# This should be upgraded by integrating with mail server to send batch
-import smtplib
-from email.mime.text import MIMEText
-from email.mime.multipart import MIMEMultipart
-import staffeln.conf
-
-CONF = staffeln.conf.CONF
-
-
-def sendEmail(src_email, src_pwd, dest_email, subject, content, smtp_server_domain, smtp_server_port):
- try:
- message = MIMEMultipart("alternative")
- message["Subject"] = subject
- message["From"] = src_email
- message["To"] = dest_email
- part = MIMEText(content, "html")
- message.attach(part)
-
- s = smtplib.SMTP(host=smtp_server_domain, port=smtp_server_port)
- s.ehlo()
- s.starttls()
- # we can comment this auth func when use the trusted ip without authentication against the smtp server
- s.login(src_email, src_pwd)
- s.sendmail(src_email, dest_email, message.as_string())
- s.close()
- return True
- except Exception as e:
- print(str(e))
- return False
-
-def SendNotification(content, receiver=None):
- subject = "Backup result"
-
- html = "<h3>${CONTENT}</h3>"
- html = html.replace("${CONTENT}", content)
-
- if receiver == None:
- return
- if len(receiver) == 0:
- return
-
- res = sendEmail(src_email=CONF.notification.sender_email,
- src_pwd=CONF.notification.sender_pwd,
- dest_email=CONF.notification.receiver,
- subject=subject,
- content=html,
- smtp_server_domain=CONF.notification.smtp_server_domain,
- smtp_server_port=CONF.notification.smtp_server_port)
diff --git a/staffeln/common/time.py b/staffeln/common/time.py
new file mode 100644
index 0000000..3059085
--- /dev/null
+++ b/staffeln/common/time.py
@@ -0,0 +1,55 @@
+import re
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+
+DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+regex = re.compile(
+ r'((?P<years>\d+?)y)?((?P<months>\d+?)m)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?'
+)
+
+
+# parse_time parses timedelta string to time dict
+# input: <string> 1y2m3w5d - all values should be integer
+# output: <dict> {year: 1, month: 2, week: 3, day: 5}
+def parse_timedelta_string(time_str):
+ empty_flag = True
+ try:
+ parts = regex.match(time_str)
+ if not parts:
+ return None
+ parts = parts.groupdict()
+ time_params = {}
+ for key in parts:
+ if parts[key]:
+ time_params[key] = int(parts[key])
+ empty_flag = False
+ else:
+ time_params[key] = 0
+ if empty_flag: return None
+ return time_params
+ except:
+ return None
+
+
+def get_current_strtime():
+ now = datetime.now()
+ return now.strftime(DEFAULT_TIME_FORMAT)
+
+
+def timeago(years, months, weeks, days, from_date=None):
+ if from_date is None:
+ from_date = datetime.now()
+ return from_date - relativedelta(years=years, months=months, weeks=weeks, days=days)
+
+## yearsago using Standard library
+# def yearsago(years, from_date=None):
+# if from_date is None:
+# from_date = datetime.now()
+# try:
+# return from_date.replace(year=from_date.year - years)
+# except ValueError:
+# # Must be 2/29!
+# assert from_date.month == 2 and from_date.day == 29 # can be removed
+# return from_date.replace(month=2, day=28,
+# year=from_date.year-years)
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index 0f0522a..f358c8a 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,221 +1,244 @@
-import staffeln.conf
-import collections
-
-from openstack.block_storage.v2 import backup
-from oslo_log import log
-from staffeln.common import auth
-from staffeln.common import context
-from staffeln import objects
-from staffeln.common import short_id
-
-CONF = staffeln.conf.CONF
-LOG = log.getLogger(__name__)
-
-
-BackupMapping = collections.namedtuple(
- "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
-)
-
-QueueMapping = collections.namedtuple(
- "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
-)
-
-conn = auth.create_connection()
-
-
-def check_vm_backup_metadata(metadata):
- if not CONF.conductor.backup_metadata_key in metadata:
- return False
- return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
-
-
-def backup_volumes_in_project(conn, project_name):
- # conn.list_servers()
- pass
-
-
-def get_projects_list():
- projects = conn.list_projects()
- return projects
-
-
-class Backup(object):
- """Implmentations of the queue with the sql."""
-
- def __init__(self):
- self.ctx = context.make_context()
- self.discovered_queue_map = None
- self.discovered_backup_map = None
- self.queue_mapping = dict()
- self.volume_mapping = dict()
- self._available_backups = None
- self._available_backups_map = None
- self._available_queues = None
- self._available_queues_map = None
-
- @property
- def available_queues(self):
- """Queues loaded from DB"""
- if self._available_queues is None:
- self._available_queues = objects.Queue.list(self.ctx)
- return self._available_queues
-
- @property
- def available_queues_map(self):
- """Mapping of backup queue loaded from DB"""
- if self._available_queues_map is None:
- self._available_queues_map = {
- QueueMapping(
- backup_id=g.backup_id,
- volume_id=g.volume_id,
- instance_id=g.instance_id,
- backup_status=g.backup_status,
- ): g
- for g in self.available_queues
- }
- return self._available_queues_map
-
- @property
- def available_backups(self):
- """Backups loaded from DB"""
- if self._available_backups is None:
- self._available_backups = objects.Volume.list(self.ctx)
- return self._available_backups
-
- @property
- def available_backups_map(self):
- """Mapping of backup loaded from DB"""
- if self._available_backups_map is None:
- self._available_backups_map = {
- QueueMapping(
- backup_id=g.backup_id,
- volume_id=g.volume_id,
- instance_id=g.instance_id,
- backup_completed=g.backup_completed,
- ): g
- for g in self.available_queues
- }
- return self._available_queues_map
-
- def get_queues(self, filters=None):
- """Get the list of volume queue columns from the queue_data table"""
- queues = objects.Queue.list(self.ctx, filters=filters)
- return queues
-
- def create_queue(self):
- """Create the queue of all the volumes for backup"""
- self.discovered_queue_map = self.check_instance_volumes()
- queues_map = self.discovered_queue_map["queues"]
- for queue_name, queue_map in queues_map.items():
- self._volume_queue(queue_map)
-
- def check_instance_volumes(self):
- """Get the list of all the volumes from the project using openstacksdk
- Function first list all the servers in the project and get the volumes
- that are attached to the instance.
- """
- queues_map = {}
- discovered_queue_map = {"queues": queues_map}
- projects = get_projects_list()
- for project in projects:
- servers = conn.compute.servers(
- details=True, all_projects=True, project_id=project.id
- )
- for server in servers:
- server_id = server.host_id
- volumes = server.attached_volumes
- for volume in volumes:
- queues_map["queues"] = QueueMapping(
- volume_id=volume["id"],
- backup_id="NULL",
- instance_id=server_id,
- backup_status=0,
- )
- return discovered_queue_map
-
- def _volume_queue(self, queue_map):
- """Saves the queue data to the database."""
- volume_id = queue_map.volume_id
- backup_id = queue_map.backup_id
- instance_id = queue_map.instance_id
- backup_status = queue_map.backup_status
- backup_mapping = dict()
- matching_backups = [
- g for g in self.available_queues if g.backup_id == backup_id
- ]
- if not matching_backups:
- volume_queue = objects.Queue(self.ctx)
- volume_queue.backup_id = backup_id
- volume_queue.volume_id = volume_id
- volume_queue.instance_id = instance_id
- volume_queue.backup_status = backup_status
- volume_queue.create()
-
- def volume_backup_initiate(self, queue):
- """Initiate the backup of the volume
- :params: queue: Provide the map of the volume that needs
- backup.
- This function will call the backupup api and change the
- backup_status and backup_id in the queue table.
- """
- volume_info = conn.get_volume(queue.volume_id)
- backup_id = queue.backup_id
- if backup_id == "NULL":
- volume_backup = conn.block_storage.create_backup(
- volume_id=queue.volume_id, force=True
- )
- update_queue = objects.Queue.get_by_id(self.ctx, queue.id)
- update_queue.backup_id = volume_backup.id
- update_queue.backup_status = 1
- update_queue.save()
-
- def check_volume_backup_status(self, queue):
- """Checks the backup status of the volume
- :params: queue: Provide the map of the volume that needs backup
- status checked.
- Call the backups api to see if the backup is successful.
- """
- for raw in conn.block_storage.backups(volume_id=queue.volume_id):
- backup_info = raw
- if backup_info.id == queue.backup_id:
- if backup_info.status == "error":
- LOG.error("Backup of the volume %s failed." % queue.id)
- queue_delete = objects.Queue.get_by_id(self.ctx, queue.id)
- queue_delete.delete_queue()
- elif backup_info.status == "success":
- backups_map = {}
- discovered_backup_map = {"backups": backups_map}
- LOG.info("Backup of the volume %s is successful." % queue.volume_id)
- backups_map["backups"] = BackupMapping(
- volume_id=queue.volume_id,
- backup_id=queue.backup_id,
- instance_id=queue.instance_id,
- backup_completed=1,
- )
- # Save volume backup success to backup_data table.
- self._volume_backup(discovered_backup_map)
- ## call db api to remove the queue object.
- queue_delete = objects.Queue.get_by_id(self.ctx, queue.id)
- queue_delete.delete_queue()
- else:
- pass
- ## Wait for the backup to be completed.
-
- def _volume_backup(self, discovered_backup_map):
- volumes_map = discovered_backup_map["backups"]
- for volume_name, volume_map in volumes_map.items():
- volume_id = volume_map.volume_id
- backup_id = volume_map.backup_id
- instance_id = volume_map.instance_id
- backup_completed = volume_map.backup_completed
- backup_mapping = dict()
- matching_backups = [
- g for g in self.available_backups if g.backup_id == backup_id
- ]
- if not matching_backups:
- volume_backup = objects.Volume(self.ctx)
- volume_backup.backup_id = backup_id
- volume_backup.volume_id = volume_id
- volume_backup.instance_id = instance_id
- volume_backup.backup_completed = backup_completed
- volume_backup.create()
+import staffeln.conf
+import collections
+from staffeln.common import constants
+
+from openstack.exceptions import ResourceNotFound as OpenstackResourceNotFound
+from openstack.exceptions import SDKException as OpenstackSDKException
+from oslo_log import log
+from staffeln.common import auth
+from staffeln.common import context
+from staffeln import objects
+from staffeln.i18n import _
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+BackupMapping = collections.namedtuple(
+ "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+)
+
+QueueMapping = collections.namedtuple(
+ "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+)
+
+conn = auth.create_connection()
+
+
+def check_vm_backup_metadata(metadata):
+ if not CONF.conductor.backup_metadata_key in metadata:
+ return False
+ return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+
+
+def get_projects_list():
+ projects = conn.list_projects()
+ return projects
+
+
+class Backup(object):
+ """Implmentations of the queue with the sql."""
+
+ def __init__(self):
+ self.ctx = context.make_context()
+ self.discovered_backup_map = None
+ self.queue_mapping = dict()
+ self.volume_mapping = dict()
+
+ def get_backups(self, filters=None):
+ return objects.Volume.list(self.ctx, filters=filters)
+
+ def get_queues(self, filters=None):
+ """Get the list of volume queue columns from the queue_data table"""
+ queues = objects.Queue.list(self.ctx, filters=filters)
+ return queues
+
+ def create_queue(self, old_tasks):
+ """Create the queue of all the volumes for backup"""
+ # 1. get the old task list, not finished in the last cycle
+ # and keep till now
+ old_task_volume_list = []
+ for old_task in old_tasks:
+ old_task_volume_list.append(old_task.volume_id)
+
+ # 2. add new tasks in the queue which are not existing in the old task list
+ queue_list = self.check_instance_volumes()
+ for queue in queue_list:
+ if not queue.volume_id in old_task_volume_list:
+ self._volume_queue(queue)
+
+ # Backup the volumes attached to which has a specific metadata
+ def filter_server(self, metadata):
+
+ if not CONF.conductor.backup_metadata_key in metadata:
+ return False
+
+ return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
+
+ # Backup the volumes in in-use and available status
+ def filter_volume(self, volume_id):
+ try:
+ volume = conn.get_volume_by_id(volume_id)
+ if volume == None: return False
+ res = volume['status'] in ("available", "in-use")
+ if not res:
+ LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+ return res
+
+ except OpenstackResourceNotFound:
+ return False
+
+ def remove_volume_backup(self, backup_object):
+ try:
+ backup = conn.get_volume_backup(backup_object.backup_id)
+ if backup == None: return False
+ if backup["status"] in ("available"):
+ conn.delete_volume_backup(backup_object.backup_id)
+ backup_object.delete_backup()
+ elif backup["status"] in ("error", "error_restoring"):
+ # TODO(Alex): need to discuss
+ # now if backup is in error status, then retention service
+ # does not remove it from openstack but removes it from the
+ # backup table so user can delete it on Horizon.
+ backup_object.delete_backup()
+ else: # "deleting", "restoring"
+ LOG.info(_("Rotation for the backup %s is skipped in this cycle "
+ "because it is in %s status") % (backup_object.backup_id, backup["status"]))
+
+ except OpenstackResourceNotFound:
+ LOG.info(_("Backup %s is not existing in Openstack."
+ "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+ # remove from the backup table
+ backup_object.delete_backup()
+ return False
+
+ def check_instance_volumes(self):
+ """Get the list of all the volumes from the project using openstacksdk
+ Function first list all the servers in the project and get the volumes
+ that are attached to the instance.
+ """
+ queues_map = []
+ projects = get_projects_list()
+ for project in projects:
+ servers = conn.compute.servers(
+ details=True, all_projects=True, project_id=project.id
+ )
+ for server in servers:
+ if not self.filter_server(server.metadata): continue
+ for volume in server.attached_volumes:
+ if not self.filter_volume(volume["id"]): continue
+ queues_map.append(
+ QueueMapping(
+ volume_id=volume["id"],
+ backup_id="NULL",
+ instance_id=server.id,
+ backup_status=constants.BACKUP_PLANNED,
+ )
+ )
+ return queues_map
+
+ def _volume_queue(self, task):
+ """Saves the queue data to the database."""
+
+ # TODO(Alex): Need to escalate discussion
+ # When create the task list, need to check the WIP backup generators
+ # which are created in the past backup cycle.
+ # Then skip to create new tasks for the volumes whose backup is WIP
+ volume_queue = objects.Queue(self.ctx)
+ volume_queue.backup_id = task.backup_id
+ volume_queue.volume_id = task.volume_id
+ volume_queue.instance_id = task.instance_id
+ volume_queue.backup_status = task.backup_status
+ volume_queue.create()
+
+ def create_volume_backup(self, queue):
+ """Initiate the backup of the volume
+ :params: queue: Provide the map of the volume that needs
+ backup.
+ This function will call the backupup api and change the
+ backup_status and backup_id in the queue table.
+ """
+ backup_id = queue.backup_id
+ if backup_id == "NULL":
+ try:
+ volume_backup = conn.create_volume_backup(
+ volume_id=queue.volume_id, force=True
+ )
+ queue.backup_id = volume_backup.id
+ queue.backup_status = constants.BACKUP_WIP
+ queue.save()
+ except OpenstackSDKException as error:
+ LOG.info(_("Backup creation for the volume %s failled. %s"
+ % (queue.volume_id, str(error))))
+ else:
+ pass
+ # TODO(Alex): remove this task from the task list
+ # Backup planned task cannot have backup_id in the same cycle
+ # Reserve for now because it is related to the WIP backup genenrators which
+ # are not finished in the current cycle
+
+ def process_failed_backup(self, task):
+ # 1. TODO(Alex): notify via email
+ LOG.error("Backup of the volume %s failed." % task.volume_id)
+ # 2. TODO(Alex): remove failed backup instance from the openstack
+ # then set the volume status in-use
+ # 3. remove failed task from the task queue
+ task.delete_queue()
+
+ def process_available_backup(self, task):
+ LOG.info("Backup of the volume %s is successful." % task.volume_id)
+ # 1. save success backup in the backup table
+ self._volume_backup(
+ BackupMapping(
+ volume_id=task.volume_id,
+ backup_id=task.backup_id,
+ instance_id=task.instance_id,
+ backup_completed=1,
+ )
+ )
+ # 2. remove from the task list
+ task.delete_queue()
+ # 3. TODO(Alex): notify via email
+
+ def process_using_backup(self, task):
+ # remove from the task list
+ task.delete_queue()
+
+ def check_volume_backup_status(self, queue):
+ """Checks the backup status of the volume
+ :params: queue: Provide the map of the volume that needs backup
+ status checked.
+ Call the backups api to see if the backup is successful.
+ """
+ # for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
+ try:
+ backup_gen = conn.get_volume_backup(queue.backup_id)
+ if backup_gen == None:
+ # TODO(Alex): need to check when it is none
+ LOG.info(_("Backup status of %s is returning none."%(queue.backup_id)))
+ return
+ if backup_gen.status == "error":
+ self.process_failed_backup(queue)
+ elif backup_gen.status == "available":
+ self.process_available_backup(queue)
+ elif backup_gen.status == "creating":
+ # TODO(Alex): Need to escalate discussion
+ # How to proceed WIP bakcup generators?
+ # To make things worse, the last backup generator is in progress till
+ # the new backup cycle
+ LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
+ else: # "deleting", "restoring", "error_restoring" status
+ self.process_using_backup(queue)
+ except OpenstackResourceNotFound as e:
+ self.process_failed_backup(queue)
+
+ def _volume_backup(self, task):
+ # matching_backups = [
+ # g for g in self.available_backups if g.backup_id == task.backup_id
+ # ]
+ # if not matching_backups:
+ volume_backup = objects.Volume(self.ctx)
+ volume_backup.backup_id = task.backup_id
+ volume_backup.volume_id = task.volume_id
+ volume_backup.instance_id = task.instance_id
+ volume_backup.backup_completed = task.backup_completed
+ volume_backup.create()
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 7f12246..6898348 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,99 +1,174 @@
-import cotyledon
-from futurist import periodics
-from oslo_log import log
-import staffeln.conf
-import sys
-import threading
-import time
-
-from staffeln.common import constants
-from staffeln.conductor import backup
-from staffeln.common import context
-
-
-LOG = log.getLogger(__name__)
-CONF = staffeln.conf.CONF
-
-
-class BackupManager(cotyledon.Service):
- name = "Staffeln conductor backup controller"
-
- def __init__(self, worker_id, conf):
- super(BackupManager, self).__init__(worker_id)
- self._shutdown = threading.Event()
- self.conf = conf
- self.ctx = context.make_context()
- LOG.info("%s init" % self.name)
-
- def run(self):
- LOG.info("%s run" % self.name)
- periodic_callables = [
- (self.backup_engine, (), {}),
- ]
- periodic_worker = periodics.PeriodicWorker(periodic_callables)
- periodic_thread = threading.Thread(target=periodic_worker.start)
- periodic_thread.daemon = True
- periodic_thread.start()
-
- def terminate(self):
- LOG.info("%s terminate" % self.name)
- super(BackupManager, self).terminate()
-
- def reload(self):
- LOG.info("%s reload" % self.name)
-
- @periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
- def backup_engine(self):
- LOG.info("backing... %s" % str(time.time()))
- LOG.info("%s periodics" % self.name)
- queue = backup.Backup().get_queues()
- queues_to_start = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_PLANNED}
- )
- queues_started = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_WIP}
- )
- if len(queue) == 0:
- create_queue = backup.Backup().create_queue()
- elif len(queues_started) != 0:
- for queue in queues_started:
- LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- backup_volume = backup.Backup().check_volume_backup_status(queue)
- elif len(queues_to_start) != 0:
- for queue in queues_to_start:
- LOG.info("Started backup process for %s" % queue.volume_id)
- backup_volume = backup.Backup().volume_backup_initiate(queue)
-
-
-class RotationManager(cotyledon.Service):
- name = "Staffeln conductor rotation controller"
-
- def __init__(self, worker_id, conf):
- super(RotationManager, self).__init__(worker_id)
- self._shutdown = threading.Event()
- self.conf = conf
- LOG.info("%s init" % self.name)
-
- def run(self):
- LOG.info("%s run" % self.name)
- interval = CONF.conductor.rotation_period
-
- periodic_callables = [
- (self.rotation_engine, (), {}),
- ]
- periodic_worker = periodics.PeriodicWorker(periodic_callables)
- periodic_thread = threading.Thread(target=periodic_worker.start)
- periodic_thread.daemon = True
- periodic_thread.start()
-
- def terminate(self):
- LOG.info("%s terminate" % self.name)
- super(RotationManager, self).terminate()
-
- def reload(self):
- LOG.info("%s reload" % self.name)
-
- @periodics.periodic(spacing=CONF.conductor.rotation_period, run_immediately=True)
- def rotation_engine(self):
- print("rotating... %s" % str(time.time()))
- LOG.info("%s rotation_engine" % self.name)
+import cotyledon
+import datetime
+from futurist import periodics
+from oslo_log import log
+import staffeln.conf
+import threading
+import time
+
+from staffeln.common import constants
+from staffeln.common import context
+from staffeln.common import time as xtime
+from staffeln.conductor import backup
+from staffeln.conductor import notify
+from staffeln.i18n import _
+
+LOG = log.getLogger(__name__)
+CONF = staffeln.conf.CONF
+
+
+class BackupManager(cotyledon.Service):
+ name = "Staffeln conductor backup controller"
+
+ def __init__(self, worker_id, conf):
+ super(BackupManager, self).__init__(worker_id)
+ self._shutdown = threading.Event()
+ self.conf = conf
+ self.ctx = context.make_context()
+ LOG.info("%s init" % self.name)
+
+ def run(self):
+ LOG.info("%s run" % self.name)
+ periodic_callables = [
+ (self.backup_engine, (), {}),
+ ]
+ periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")
+ periodic_thread = threading.Thread(target=periodic_worker.start)
+ periodic_thread.daemon = True
+ periodic_thread.start()
+
+ def terminate(self):
+ LOG.info("%s terminate" % self.name)
+ super(BackupManager, self).terminate()
+
+ def reload(self):
+ LOG.info("%s reload" % self.name)
+
+ # Check if the backup count is over the limit
+ # TODO(Alex): how to count the backup number
+ # only available backups are calculated?
+ def _check_quota(self):
+ LOG.info(_("Checking the backup limitation..."))
+ max_count = CONF.conductor.max_backup_count
+ current_count = len(backup.Backup().get_backups())
+ if max_count <= current_count:
+ # TODO(Alex): Send notification
+ LOG.info(_("The backup limit is over."))
+ return True
+ LOG.info(_("The max limit is %s, and current backup count is %s" % (max_count, current_count)))
+ return False
+
+ # Manage active backup generators
+ # TODO(Alex): need to discuss
+ # Need to wait until all backups are finished?
+ # That is required to make the backup report
+ def _process_wip_tasks(self):
+ LOG.info(_("Processing WIP backup generators..."))
+ queues_started = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_WIP}
+ )
+ if len(queues_started) != 0:
+ for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
+
+ # Create backup generators
+ def _process_todo_tasks(self):
+ LOG.info(_("Creating new backup generators..."))
+ queues_to_start = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_PLANNED}
+ )
+ if len(queues_to_start) != 0:
+ for queue in queues_to_start:
+ backup.Backup().create_volume_backup(queue)
+
+ # Refresh the task queue
+ def _update_task_queue(self):
+ LOG.info(_("Updating backup task queue..."))
+ current_tasks = backup.Backup().get_queues()
+ backup.Backup().create_queue(current_tasks)
+
+ def _report_backup_result(self):
+ # TODO(Alex): Need to update these list
+ self.success_backup_list = []
+ self.failed_backup_list = []
+ notify.SendBackupResultEmail(self.success_backup_list, self.failed_backup_list)
+
+
+ @periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)
+ def backup_engine(self):
+ LOG.info("backing... %s" % str(time.time()))
+ LOG.info("%s periodics" % self.name)
+
+ if self._check_quota(): return
+ # NOTE(Alex): If _process_wip_tasks() waits tiil no WIP tasks
+ # exist, no need to repeat this function before and after queue update.
+ self._process_wip_tasks()
+ self._update_task_queue()
+ self._process_todo_tasks()
+ self._process_wip_tasks()
+ self._report_backup_result()
+
+
+
+class RotationManager(cotyledon.Service):
+ name = "Staffeln conductor rotation controller"
+
+ def __init__(self, worker_id, conf):
+ super(RotationManager, self).__init__(worker_id)
+ self._shutdown = threading.Event()
+ self.conf = conf
+ LOG.info("%s init" % self.name)
+
+ def run(self):
+ LOG.info("%s run" % self.name)
+
+ periodic_callables = [
+ (self.rotation_engine, (), {}),
+ ]
+ periodic_worker = periodics.PeriodicWorker(periodic_callables, schedule_strategy="last_finished")
+ periodic_thread = threading.Thread(target=periodic_worker.start)
+ periodic_thread.daemon = True
+ periodic_thread.start()
+
+ def terminate(self):
+ LOG.info("%s terminate" % self.name)
+ super(RotationManager, self).terminate()
+
+ def reload(self):
+ LOG.info("%s reload" % self.name)
+
+ def get_backup_list(self):
+ threshold_strtime = self.get_threshold_strtime()
+ if threshold_strtime == None: return False
+ self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
+ return True
+
+ def remove_backups(self):
+ print(self.backup_list)
+ for retention_backup in self.backup_list:
+ # 1. check the backup status and delete only available backups
+ backup.Backup().remove_volume_backup(retention_backup)
+
+ @periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
+ def rotation_engine(self):
+ LOG.info("%s rotation_engine" % self.name)
+ # 1. get the list of backups to remove based on the retention time
+ if not self.get_backup_list(): return
+
+ # 2. remove the backups
+ self.remove_backups()
+
+ # get the threshold time str
+ def get_threshold_strtime(self):
+ time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)
+ if time_delta_dict == None: return None
+
+ res = xtime.timeago(
+ years=time_delta_dict["years"],
+ months=time_delta_dict["months"],
+ weeks=time_delta_dict["weeks"],
+ days=time_delta_dict["days"],
+ )
+ if res == None: LOG.info(_("Retention time format is invalid. "
+ "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
+
+ return res.strftime(xtime.DEFAULT_TIME_FORMAT)
diff --git a/staffeln/conductor/notify.py b/staffeln/conductor/notify.py
new file mode 100644
index 0000000..d80da4e
--- /dev/null
+++ b/staffeln/conductor/notify.py
@@ -0,0 +1,57 @@
+# Email notification package
+# This should be upgraded by integrating with mail server to send batch
+import smtplib
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from oslo_log import log
+import staffeln.conf
+from staffeln.common import time as xtime
+from staffeln.i18n import _
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+
+def _sendEmail(src_email, src_pwd, dest_email, subject, content, smtp_server_domain, smtp_server_port):
+ message = MIMEMultipart("alternative")
+ message["Subject"] = subject
+ message["From"] = src_email
+ message["To"] = dest_email
+ part = MIMEText(content, "html")
+ message.attach(part)
+
+ s = smtplib.SMTP(host=smtp_server_domain, port=smtp_server_port)
+ s.ehlo()
+ s.starttls()
+ # we can comment this auth func when use the trusted ip without authentication against the smtp server
+ s.login(src_email, src_pwd)
+ s.sendmail(src_email, dest_email, message.as_string())
+ s.close()
+
+
+def SendBackupResultEmail(success_backup_list, failed_backup_list):
+ subject = "Backup result"
+
+ html = "<h3>${TIME}</h3>" \
+ "<h3>Success List</h3>" \
+ "<h4>${SUCCESS_VOLUME_LIST}</h4>" \
+ "<h3>Failed List</h3>" \
+ "<h4>${FAILED_VOLUME_LIST}</h4>"
+
+ success_volumes = '<br>'.join([str(elem) for elem in success_backup_list])
+ failed_volumes = '<br>'.join([str(elem) for elem in failed_backup_list])
+ html = html.replace("${TIME}", xtime.get_current_strtime())
+ html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)
+ html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)
+ try:
+ _sendEmail(src_email=CONF.notification.sender_email,
+ src_pwd=CONF.notification.sender_pwd,
+ dest_email=CONF.notification.receiver,
+ subject=subject,
+ content=html,
+ smtp_server_domain=CONF.notification.smtp_server_domain,
+ smtp_server_port=CONF.notification.smtp_server_port)
+ LOG.info(_("Backup result email sent"))
+ except Exception as e:
+ LOG.error(_("Backup result email send failed. Please check email configuration. %s" % (str(e))))
+
diff --git a/staffeln/conf/api.py b/staffeln/conf/api.py
index 846f959..e405d6a 100755
--- a/staffeln/conf/api.py
+++ b/staffeln/conf/api.py
@@ -1,7 +1,6 @@
from oslo_config import cfg
from staffeln.i18n import _
-
api_group = cfg.OptGroup(
"api",
title="API options",
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 9906004..4e2aad9 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,7 +1,6 @@
from oslo_config import cfg
from staffeln.i18n import _
-
conductor_group = cfg.OptGroup(
"conductor",
title="Conductor Options",
@@ -13,17 +12,22 @@
"backup_workers",
default=1,
help=_("The maximum number of backup processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.IntOpt(
- "backup_period",
- default=10,
- min=1,
+ "backup_service_period",
+ default=60,
+ min=10,
help=_("The time of bakup period, the unit is one minute."),
),
cfg.StrOpt(
"backup_metadata_key",
- default="test",
+ default="__automated_backup",
+ help=_("The key string of metadata the VM, which requres back up, has"),
+ ),
+ cfg.IntOpt(
+ "max_backup_count",
+ default=10,
help=_("The key string of metadata the VM, which requres back up, has"),
),
]
@@ -33,13 +37,25 @@
"rotation_workers",
default=1,
help=_("The maximum number of rotation processes to "
- "fork and run. Default to number of CPUs on the host."),
+ "fork and run. Default to number of CPUs on the host."),
),
cfg.IntOpt(
- "rotation_period",
+ "retention_service_period",
+ default=20,
+ min=10,
+ help=_("The period of the retention service, the unit is one second."),
+ ),
+ cfg.IntOpt(
+ "rotation_workers",
default=1,
- min=1,
- help=_("The time of rotation period, the unit is one day."),
+ help=_("The maximum number of rotation processes to "
+ "fork and run. Default to number of CPUs on the host."),
+ ),
+ cfg.StrOpt(
+ "retention_time",
+ default="2w3d",
+ help=_("The time of retention period, the for mat is "
+ "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
),
]
diff --git a/staffeln/conf/notify.py b/staffeln/conf/notify.py
index 20f1ede..c292e51 100644
--- a/staffeln/conf/notify.py
+++ b/staffeln/conf/notify.py
@@ -9,11 +9,6 @@
)
email_opts = [
- cfg.StrOpt(
- "template",
- default="<h3>${CONTENT}</h3>",
- help=_("This html template is used to email the backup result."),
- ),
cfg.ListOpt(
"receiver",
default=[],
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 41cd483..4b4fc9f 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -109,13 +109,8 @@
if filters is None:
filters = {}
- plain_fields = [
- "volume_id",
- "backup_id",
- "backup_completed",
- "instance_id",
- "created_at",
- ]
+
+ plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
return self._add_filters(
query=query,
@@ -314,6 +309,7 @@
except:
LOG.error("Queue Not found.")
+
def get_backup_by_backup_id(self, context, backup_id):
"""Get the column from the backup_data with matching backup_id"""
@@ -332,6 +328,7 @@
except:
LOG.error("Backup resource not found.")
+
def soft_delete_backup(self, id):
try:
return self._soft_delete(models.Backup_data, id)
diff --git a/staffeln/objects/__init__.py b/staffeln/objects/__init__.py
index 8003db1..4f7ca56 100755
--- a/staffeln/objects/__init__.py
+++ b/staffeln/objects/__init__.py
@@ -1,4 +1,5 @@
from .queue import Queue
+from .volume import Volume
# from volume import Volume
def register_all():
__import__('staffeln.objects.volume')
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 1a6c09d..6655b7b 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -82,3 +82,4 @@
else:
backup = cls._from_db_object(cls(context), db_backup)
return backup
+
\ No newline at end of file