Add result module
diff --git a/staffeln/api/app.py b/staffeln/api/app.py
index 62865f7..de5875e 100755
--- a/staffeln/api/app.py
+++ b/staffeln/api/app.py
@@ -1,12 +1,10 @@
from flask import Flask
from flask import Response
-from flask import jsonify
from flask import request
from staffeln import objects
from staffeln.common import context
-from staffeln.common import auth
+from staffeln.common import openstack
from oslo_log import log
-from openstack import exceptions as exc
ctx = context.make_context()
@@ -14,18 +12,23 @@
LOG = log.getLogger(__name__)
-conn = auth.create_connection()
-
-@app.route("/v1/backup", methods=["GET"])
+@app.route("/v1/backup", methods=["POST"])
def backup_id():
- if "backup_id" not in request.args:
- # Return error if the backup_id argument is not provided.
- return "Error: No backup_id field provided. Please specify backup_id."
- backup_id = request.args["backup_id"]
+ current_user_id = openstack.get_user_id()
+
+ if not "user_id" in request.args or not "backup_id" in request.args:
+ # Return error if the backup_id argument is not provided.
+ return Response(
+ "Error: backup_id or user_id is missing.", status=403, mimetype="text/plain"
+ )
+
+ if current_user_id != request.args["user_id"]:
+ return Response("False", status=401, mimetype="text/plain")
+
# Retrive the backup object from backup_data table with matching backup_id.
- backup = objects.Volume.get_backup_by_backup_id(ctx, backup_id)
+ backup = objects.Volume.get_backup_by_backup_id(ctx, request.args["backup_id"])
# backup_info is None when there is no entry of the backup id in backup_table.
# So the backup should not be the automated backup.
if backup is None:
@@ -35,7 +38,7 @@
mimetype="text/plain",
)
else:
- return Response("Deny", status=401, mimetype="text/plain")
+ return Response("False", status=401, mimetype="text/plain")
def run(host, port, ssl_context):
diff --git a/staffeln/common/email.py b/staffeln/common/email.py
new file mode 100644
index 0000000..86ebdd2
--- /dev/null
+++ b/staffeln/common/email.py
@@ -0,0 +1,35 @@
+# Email notification package
+# This should be upgraded by integrating with mail server to send batch
+import smtplib
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+
+__DRY_RUN__ = True
+
+
+def send(
+ src_email,
+ src_pwd,
+ dest_email,
+ subject,
+ content,
+ smtp_server_domain,
+ smtp_server_port,
+):
+ message = MIMEMultipart("alternative")
+ message["Subject"] = subject
+ # This part is commented as it won't be able to parce the items in list.
+ # message["From"] = src_email
+ # message["To"] = dest_email
+ part = MIMEText(content, "html")
+ message.attach(part)
+ if __DRY_RUN__:
+ print(part)
+ return
+ s = smtplib.SMTP(host=smtp_server_domain, port=smtp_server_port)
+ # s.ehlo()
+ # s.starttls()
+ # we can comment this auth func when use the trusted ip without authentication against the smtp server
+ # s.login(src_email, src_pwd)
+ s.sendmail(src_email, dest_email, message.as_string())
+ s.close()
diff --git a/staffeln/common/openstack.py b/staffeln/common/openstack.py
index 02570c9..c19f28e 100644
--- a/staffeln/common/openstack.py
+++ b/staffeln/common/openstack.py
@@ -8,12 +8,19 @@
# user
def get_user_id():
user_name = conn.config.auth["username"]
- domain_id = conn.config.auth["user_domain_id"]
- user = conn.get_user(name_or_id=user_name, domain_id=domain_id)
+ if "user_domain_id" in conn.config.auth:
+ domain_id = conn.config.auth["user_domain_id"]
+ user = conn.get_user(name_or_id=user_name, domain_id=domain_id)
+ elif "user_domain_name" in conn.config.auth:
+ domain_name = conn.config.auth["user_domain_name"]
+ user = conn.get_user(name_or_id=user_name, domain_id=domain_name)
+ else:
+ user = conn.get_user(name_or_id=user_name)
return user.id
############## project
def get_projects():
+ conn.block_storage
return conn.list_projects()
@@ -23,7 +30,7 @@
############## volume
-def get_volume(uuid):
+def get_volume(uuid, project_id):
# volume = conn.block_storage.get_volume(volume_id)
return conn.get_volume_by_id(uuid)
@@ -37,7 +44,7 @@
return conn.get_volume_backup(uuid)
-def create_backup(volume_id, force=True, wait=False):
+def create_backup(volume_id, project_id, force=True, wait=False):
# return conn.block_storage.create_backup(
# volume_id=queue.volume_id, force=True, project_id=queue.project_id,
# )
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index bcdc5b8..081d06a 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -2,7 +2,7 @@
import staffeln.conf
import collections
from staffeln.common import constants
-
+from staffeln.conductor import result
from openstack.exceptions import ResourceNotFound as OpenstackResourceNotFound
from openstack.exceptions import SDKException as OpenstackSDKException
from oslo_log import log
@@ -35,9 +35,10 @@
def __init__(self):
self.ctx = context.make_context()
- self.discovered_backup_map = None
- self.queue_mapping = dict()
- self.volume_mapping = dict()
+ self.result = result.BackupResult()
+
+ def publish_backup_result(self):
+ self.result.publish()
def get_backups(self, filters=None):
return objects.Volume.list(self.ctx, filters=filters)
@@ -73,13 +74,15 @@
return metadata[CONF.conductor.backup_metadata_key].lower() == constants.BACKUP_ENABLED_KEY
# Backup the volumes in in-use and available status
- def filter_by_volume_status(self, volume_id):
+ def filter_by_volume_status(self, volume_id, project_id):
try:
- volume = openstacksdk.get_volume(volume_id)
+ volume = openstacksdk.get_volume(volume_id, project_id)
if volume == None: return False
res = volume['status'] in ("available", "in-use")
if not res:
- LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+ reason = _("Volume %s is not backed because it is in %s status" % (volume_id, volume['status']))
+ LOG.info(reason)
+ self.result.add_failed_backup(project_id, volume_id, reason)
return res
except OpenstackResourceNotFound:
@@ -88,23 +91,27 @@
# delete all backups forcily regardless of the status
def hard_cancel_backup_task(self, task):
try:
- LOG.info(_("Cancel backup %s" % task.backup_id))
+ reason = _("Cancel backup %s because of timeout." % task.backup_id)
+ LOG.info(reason)
backup = openstacksdk.get_backup(task.backup_id)
if backup == None: return task.delete_queue()
openstacksdk.delete_backup(task.backup_id)
task.delete_queue()
-
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
except OpenstackResourceNotFound:
task.delete_queue()
except OpenstackSDKException as e:
- LOG.info(_("Backup %s deletion failed."
- "%s" % (task.backup_id, str(e))))
- # TODO(Alex): When backup timeout and cancel failed
+ reason = _("Backup %s deletion failed."
+ "%s" % (task.backup_id, str(e)))
+ LOG.info(reason)
+ # TODO(Alex): If backup timeout and also back cancel failed,
+ # then what to do?
# 1. notify
# 2. set the volume status as in-use
# remove from the queue table
task.delete_queue()
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
# delete only available backups
def soft_remove_backup_task(self, backup_object):
@@ -170,11 +177,15 @@
queues_map = []
projects = openstacksdk.get_projects()
for project in projects:
+ empty_project = True
servers = openstacksdk.get_servers(project_id=project.id)
for server in servers:
if not self.filter_by_server_metadata(server.metadata): continue
+ if empty_project:
+ empty_project = False
+ self.result.add_project(project.id, project.name)
for volume in server.attached_volumes:
- if not self.filter_by_volume_status(volume["id"]): continue
+ if not self.filter_by_volume_status(volume["id"], project.id): continue
queues_map.append(
QueueMapping(
project_id=project.id,
@@ -210,15 +221,19 @@
backup_id = queue.backup_id
if backup_id == "NULL":
try:
- LOG.info(_("Backup for volume %s creating" % queue.volume_id))
+ LOG.info(_("Backup for volume %s creating in project %s"
+ % (queue.volume_id, queue.project_id)))
# NOTE(Alex): no need to wait because we have a cycle time out
- volume_backup = openstacksdk.create_backup(volume_id=queue.volume_id)
+ volume_backup = openstacksdk.create_backup(volume_id=queue.volume_id,
+ project_id=queue.project_id)
queue.backup_id = volume_backup.id
queue.backup_status = constants.BACKUP_WIP
queue.save()
except OpenstackSDKException as error:
- LOG.info(_("Backup creation for the volume %s failled. %s"
- % (queue.volume_id, str(error))))
+ reason = _("Backup creation for the volume %s failled. %s"
+ % (queue.volume_id, str(error)))
+ LOG.info(reason)
+ self.result.add_failed_backup(queue.project_id, queue.volume_id, reason)
parsed = parse.parse("Error in creating volume backup {id}", str(error))
if parsed == None: return
queue.backup_id = parsed["id"]
@@ -233,7 +248,9 @@
def process_failed_backup(self, task):
# 1. TODO(Alex): notify via email
- LOG.error("Backup of the volume %s failed." % task.volume_id)
+ reason = _("The status of backup for the volume %s is error." % task.volume_id)
+ self.result.add_failed_backup(task.project_id, task.volume_id, reason)
+ LOG.error(reason)
# 2. cancel volume backup
self.hard_cancel_backup_task(task)
# 3. remove failed task from the task queue
@@ -254,13 +271,14 @@
backup_completed=1,
)
)
+ self.result.add_success_backup(task.project_id, task.volume_id, task.backup_id)
# 2. remove from the task list
task.delete_queue()
# 3. TODO(Alex): notify via email
def process_using_backup(self, task):
- # remove from the task list
- task.delete_queue()
+ # treat same as the available backup for now
+ self.process_available_backup(task)
def check_volume_backup_status(self, queue):
"""Checks the backup status of the volume
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index e68c3fe..e77ef8e 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -9,7 +9,6 @@
from staffeln.common import context
from staffeln.common import time as xtime
from staffeln.conductor import backup
-from staffeln.conductor import notify
from staffeln.i18n import _
LOG = log.getLogger(__name__)
@@ -24,6 +23,7 @@
self._shutdown = threading.Event()
self.conf = conf
self.ctx = context.make_context()
+ self.controller = backup.Backup()
LOG.info("%s init" % self.name)
def run(self):
@@ -50,19 +50,19 @@
self.cycle_start_time = xtime.get_current_time()
# loop - take care of backup result while timeout
- while(1):
- queues_started = backup.Backup().get_queues(
+ while (1):
+ queues_started = self.controller.get_queues(
filters={"backup_status": constants.BACKUP_WIP}
)
if len(queues_started) == 0:
LOG.info(_("task queue empty"))
break
- if not self._backup_cycle_timeout():# time in
+ if not self._backup_cycle_timeout(): # time in
LOG.info(_("cycle timein"))
- for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
- else: # time out
+ for queue in queues_started: self.controller.check_volume_backup_status(queue)
+ else: # time out
LOG.info(_("cycle timeout"))
- for queue in queues_started: backup.Backup().hard_cancel_backup_task(queue)
+ for queue in queues_started: self.controller.hard_cancel_backup_task(queue)
break
time.sleep(constants.BACKUP_RESULT_CHECK_INTERVAL)
@@ -93,28 +93,21 @@
# Create backup generators
def _process_todo_tasks(self):
LOG.info(_("Creating new backup generators..."))
- queues_to_start = backup.Backup().get_queues(
+ queues_to_start = self.controller.get_queues(
filters={"backup_status": constants.BACKUP_PLANNED}
)
if len(queues_to_start) != 0:
for queue in queues_to_start:
- backup.Backup().create_volume_backup(queue)
+ self.controller.create_volume_backup(queue)
# Refresh the task queue
def _update_task_queue(self):
LOG.info(_("Updating backup task queue..."))
- current_tasks = backup.Backup().get_queues()
- backup.Backup().create_queue(current_tasks)
+ current_tasks = self.controller.get_queues()
+ self.controller.create_queue(current_tasks)
def _report_backup_result(self):
- # 1. get the quota usage
- # 2. get the success backup list
- # 3. get the failed backup list
- # 4. send notification
- quota = backup.Backup().get_backup_quota()
- self.success_backup_list = []
- self.failed_backup_list = []
- notify.SendBackupResultEmail(quota, self.success_backup_list, self.failed_backup_list)
+ self.controller.publish_backup_result()
@periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)
def backup_engine(self):
@@ -134,6 +127,7 @@
super(RotationManager, self).__init__(worker_id)
self._shutdown = threading.Event()
self.conf = conf
+ self.controller = backup.Backup()
LOG.info("%s init" % self.name)
def run(self):
@@ -157,13 +151,13 @@
def get_backup_list(self):
threshold_strtime = self.get_threshold_strtime()
if threshold_strtime == None: return False
- self.backup_list = backup.Backup().get_backups(filters={"created_at__lt": threshold_strtime})
+ self.backup_list = self.controller.get_backups(filters={"created_at__lt": threshold_strtime})
return True
def remove_backups(self):
print(self.backup_list)
for retention_backup in self.backup_list:
- backup.Backup().hard_remove_volume_backup(retention_backup)
+ self.controller.hard_remove_volume_backup(retention_backup)
@periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
def rotation_engine(self):
diff --git a/staffeln/conductor/notify.py b/staffeln/conductor/notify.py
deleted file mode 100644
index 6d76368..0000000
--- a/staffeln/conductor/notify.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Email notification package
-# This should be upgraded by integrating with mail server to send batch
-import smtplib
-from email.mime.text import MIMEText
-from email.mime.multipart import MIMEMultipart
-from oslo_log import log
-import staffeln.conf
-from staffeln.common import time as xtime
-from staffeln.i18n import _
-
-__DEBUG__ = True
-
-
-CONF = staffeln.conf.CONF
-LOG = log.getLogger(__name__)
-
-
-def _sendEmail(src_email, src_pwd, dest_email, subject, content, smtp_server_domain, smtp_server_port):
- message = MIMEMultipart("alternative")
- message["Subject"] = subject
- message["From"] = src_email
- message["To"] = dest_email
- part = MIMEText(content, "html")
- print(part)
- message.attach(part)
- if __DEBUG__:
- return
- s = smtplib.SMTP(host=smtp_server_domain, port=smtp_server_port)
- s.ehlo()
- s.starttls()
- # we can comment this auth func when use the trusted ip without authentication against the smtp server
- s.login(src_email, src_pwd)
- s.sendmail(src_email, dest_email, message.as_string())
- s.close()
-
-
-def SendBackupResultEmail(quota, success_backup_list, failed_backup_list):
- subject = "Backup result"
-
- html = "<h3>${TIME}</h3><br>" \
- "<h3>Quota Usage</h3><br>" \
- "<h4>Limit: ${QUOTA_LIMIT}, In Use: ${QUOTA_IN_USE}, Reserved: ${QUOTA_RESERVED}</h4><br>" \
- "<h3>Success List</h3><br>" \
- "<h4>${SUCCESS_VOLUME_LIST}</h4><br>" \
- "<h3>Failed List</h3><br>" \
- "<h4>${FAILED_VOLUME_LIST}</h4><br>"
-
- success_volumes = '<br>'.join([str(elem) for elem in success_backup_list])
- failed_volumes = '<br>'.join([str(elem) for elem in failed_backup_list])
- html = html.replace("${TIME}", xtime.get_current_strtime())
- html = html.replace("${QUOTA_LIMIT}", quota["limit"])
- html = html.replace("${QUOTA_IN_USE}", quota["in_use"])
- html = html.replace("${QUOTA_RESERVED}", quota["reserved"])
- html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)
- html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)
- try:
- _sendEmail(src_email=CONF.notification.sender_email,
- src_pwd=CONF.notification.sender_pwd,
- dest_email=CONF.notification.receiver,
- subject=subject,
- content=html,
- smtp_server_domain=CONF.notification.smtp_server_domain,
- smtp_server_port=CONF.notification.smtp_server_port)
- LOG.info(_("Backup result email sent"))
- except Exception as e:
- LOG.error(_("Backup result email send failed. Please check email configuration. %s" % (str(e))))
diff --git a/staffeln/conductor/result.py b/staffeln/conductor/result.py
index e69de29..d6e0d3f 100644
--- a/staffeln/conductor/result.py
+++ b/staffeln/conductor/result.py
@@ -0,0 +1,99 @@
+# Email notification package
+# This should be upgraded by integrating with mail server to send batch
+import smtplib
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from oslo_log import log
+import staffeln.conf
+from staffeln.common import time as xtime
+from staffeln.common import email
+from staffeln.i18n import _
+from staffeln.conductor import backup
+from staffeln.common import openstack as openstacksdk
+
+CONF = staffeln.conf.CONF
+LOG = log.getLogger(__name__)
+
+
+class BackupResult(object):
+
+ def __init__(self):
+ self.content = ""
+ self.project_list = []
+ self.success_backup_list = {}
+ self.failed_backup_list = {}
+
+ def add_project(self, id, name):
+ if id in self.success_backup_list: return
+ self.project_list.append({
+ "name": name,
+ "id": id
+ })
+ self.success_backup_list[id] = []
+ self.failed_backup_list[id] = []
+
+ def add_success_backup(self, project_id, volume_id, backup_id):
+ if not project_id in self.success_backup_list:
+ LOG.error(_("Not registered project is reported for backup result."))
+ return
+ self.success_backup_list[project_id].append({
+ "volume_id": volume_id,
+ "backup_id": backup_id,
+ })
+
+ def add_failed_backup(self, project_id, volume_id, reason):
+ if not project_id in self.failed_backup_list:
+ LOG.error(_("Not registered project is reported for backup result."))
+ return
+ self.failed_backup_list[project_id].append({
+ "volume_id": volume_id,
+ "reason": reason,
+ })
+
+ def send_result_email(self):
+ subject = "Backup result"
+ try:
+ email.send(
+ src_email=CONF.notification.sender_email,
+ src_pwd=CONF.notification.sender_pwd,
+ dest_email=CONF.notification.receiver,
+ subject=subject,
+ content=self.content,
+ smtp_server_domain=CONF.notification.smtp_server_domain,
+ smtp_server_port=CONF.notification.smtp_server_port,
+ )
+ LOG.info(_("Backup result email sent"))
+ except Exception as e:
+ LOG.error(_("Backup result email send failed. Please check email configuration. %s" % (str(e))))
+
+ def publish(self):
+ # 1. get quota
+ self.content = "<h3>${TIME}</h3><br>"
+ self.content = self.content.replace("${TIME}", xtime.get_current_strtime())
+ html = ""
+ for project in self.project_list:
+ quota = backup.Backup().get_backup_quota(project["id"])
+
+ html += "<h3>Project: ${PROJECT}</h3><br>" \
+ "<h3>Quota Usage</h3><br>" \
+ "<h4>Limit: ${QUOTA_LIMIT}, In Use: ${QUOTA_IN_USE}, Reserved: ${QUOTA_RESERVED}</h4><br>" \
+ "<h3>Success List</h3><br>" \
+ "<h4>${SUCCESS_VOLUME_LIST}</h4><br>" \
+ "<h3>Failed List</h3><br>" \
+ "<h4>${FAILED_VOLUME_LIST}</h4><br>"
+
+ success_volumes = "<br>".join(
+ ["Volume ID: %s, Backup ID: %s" % (str(e["volume_id"]), str(e["backup_id"]))
+ for e in self.success_backup_list])
+ failed_volumes = "<br>".join(
+ ["Volume ID: %s, Reason: %s" % (str(e["volume_id"]), str(e["reason"]))
+ for e in self.failed_backup_list])
+ html = html.replace("${QUOTA_LIMIT}", str(quota["limit"]))
+ html = html.replace("${QUOTA_IN_USE}", str(quota["in_use"]))
+ html = html.replace("${QUOTA_RESERVED}", str(quota["reserved"]))
+ html = html.replace("${SUCCESS_VOLUME_LIST}", success_volumes)
+ html = html.replace("${FAILED_VOLUME_LIST}", failed_volumes)
+ html = html.replace("${PROJECT}", project["name"])
+ if html == "": return
+ self.content += html
+ self.send_result_email()
diff --git a/staffeln/conf/notify.py b/staffeln/conf/notify.py
index c292e51..13375f1 100644
--- a/staffeln/conf/notify.py
+++ b/staffeln/conf/notify.py
@@ -12,30 +12,35 @@
cfg.ListOpt(
"receiver",
default=[],
- help=_("The receivers of the bakcup result by email."
- "A list of addresses to receive backup result emails to. A bare"
- " string will be treated as a list with 1 address."),
+ help=_(
+ "The receivers of the bakcup result by email."
+ "A list of addresses to receive backup result emails to. A bare"
+ " string will be treated as a list with 1 address."
+ ),
),
cfg.StrOpt(
"sender_email",
- help=_("Log in on an SMTP server that requires authentication."
- "The user name to authenticate with."
- ),
+ help=_(
+ "Log in on an SMTP server that requires authentication."
+ "The user name to authenticate with."
+ ),
),
+ # We can remove the sender password as we are using postfix to send mail and we won't be authenticating.
cfg.StrOpt(
"sender_pwd",
- help=_("Log in on an SMTP server that requires authentication."
- "The password for the authentication."
- ),
+ help=_(
+ "Log in on an SMTP server that requires authentication."
+ "The password for the authentication."
+ ),
),
cfg.StrOpt(
"smtp_server_domain",
- default="smtp.gmail.com",
+ default="localhost",
help=_("the name of the remote host to which to connect"),
),
cfg.StrOpt(
"smtp_server_port",
- default="587",
+ default="25",
help=_("the port to which to connect"),
),
]