Merge pull request #12 from vexxhost/update-backup-service
Set timeout for backup cycle to take care of backup results
diff --git a/requirements.txt b/requirements.txt
index 45ae2d5..9c3aad4 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,6 +10,7 @@
oslo.db>=5.0.0
oslo.config>=8.1.0
oslo.log>=4.4.0 # Apache-2.0
+oslo_versionedobjects
openstacksdk>0.28.0
pymysql
diff --git a/staffeln/common/constants.py b/staffeln/common/constants.py
index d0ec6fe..91b2a95 100644
--- a/staffeln/common/constants.py
+++ b/staffeln/common/constants.py
@@ -3,3 +3,7 @@
BACKUP_PLANNED=0
BACKUP_ENABLED_KEY = 'true'
+BACKUP_RESULT_CHECK_INTERVAL = 60 # second
+
+# default config values
+DEFAULT_BACKUP_CYCLE_TIMEOUT="5min"
\ No newline at end of file
diff --git a/staffeln/common/time.py b/staffeln/common/time.py
index 3059085..02e40e1 100644
--- a/staffeln/common/time.py
+++ b/staffeln/common/time.py
@@ -5,7 +5,7 @@
DEFAULT_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
regex = re.compile(
- r'((?P<years>\d+?)y)?((?P<months>\d+?)m)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?'
+ r'((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?'
)
@@ -31,16 +31,20 @@
except:
return None
+def get_current_time():
+ return datetime.now()
def get_current_strtime():
now = datetime.now()
return now.strftime(DEFAULT_TIME_FORMAT)
-def timeago(years, months, weeks, days, from_date=None):
+def timeago(years=0, months=0, weeks=0, days=0, hours=0, minutes=0, seconds=0, from_date=None):
if from_date is None:
from_date = datetime.now()
- return from_date - relativedelta(years=years, months=months, weeks=weeks, days=days)
+ return from_date - relativedelta(years=years, months=months,
+ weeks=weeks, days=days, hours=hours,
+ minutes=minutes, seconds=seconds)
## yearsago using Standard library
# def yearsago(years, from_date=None):
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index f358c8a..47a4fc0 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -1,3 +1,4 @@
+import parse
import staffeln.conf
import collections
from staffeln.common import constants
@@ -14,11 +15,11 @@
LOG = log.getLogger(__name__)
BackupMapping = collections.namedtuple(
- "BackupMapping", ["volume_id", "backup_id", "instance_id", "backup_completed"]
+ "BackupMapping", ["volume_id", "backup_id", "project_id", "instance_id", "backup_completed"]
)
QueueMapping = collections.namedtuple(
- "QueueMapping", ["volume_id", "backup_id", "instance_id", "backup_status"]
+ "QueueMapping", ["volume_id", "backup_id", "project_id", "instance_id", "backup_status"]
)
conn = auth.create_connection()
@@ -77,6 +78,7 @@
# Backup the volumes in in-use and available status
def filter_volume(self, volume_id):
try:
+ # volume = conn.block_storage.get_volume(volume_id)
volume = conn.get_volume_by_id(volume_id)
if volume == None: return False
res = volume['status'] in ("available", "in-use")
@@ -87,12 +89,47 @@
except OpenstackResourceNotFound:
return False
- def remove_volume_backup(self, backup_object):
+ # delete all backups forcily regardless of the status
+ def hard_cancel_volume_backup(self, task):
try:
+ LOG.info(_("Cancel backup %s" % task.backup_id))
+ # backup = conn.block_storage.get_backup(
+ # project_id=task.project_id, backup_id=task.backup_id,
+ # )
+ backup = conn.get_volume_backup(task.backup_id)
+ if backup == None: return task.delete_queue()
+
+ # TODO(Alex): v3 is not supporting force delete?
+ # conn.block_storage.delete_backup(
+ # project_id=task.project_id, backup_id=task.backup_id,
+ # )
+ conn.delete_volume_backup(task.backup_id, force=True)
+ # TODO(Alex): After delete the backup generator, need to set the volume status again
+ task.delete_queue()
+
+ except OpenstackResourceNotFound:
+ task.delete_queue()
+
+ except OpenstackSDKException as e:
+ LOG.info(_("Backup %s deletion failed."
+ "%s" % (task.backup_id, str(e))))
+ # TODO(Alex): When backup timeout and cancel failed
+ # 1. notify
+ # 2. set the volume status as in-use
+ # remove from the queue table
+ task.delete_queue()
+
+ # delete only available backups
+ def soft_remove_volume_backup(self, backup_object):
+ try:
+
+ # backup = conn.block_storage.get_backup(
+ # project_id=backup_object.project_id, backup_id=backup_object.backup_id,
+ # )
backup = conn.get_volume_backup(backup_object.backup_id)
- if backup == None: return False
+ if backup == None: return backup_object.delete_backup()
if backup["status"] in ("available"):
- conn.delete_volume_backup(backup_object.backup_id)
+ conn.delete_volume_backup(backup_object.backup_id, force=True)
backup_object.delete_backup()
elif backup["status"] in ("error", "error_restoring"):
# TODO(Alex): need to discuss
@@ -111,6 +148,40 @@
backup_object.delete_backup()
return False
+ except OpenstackSDKException as e:
+ LOG.info(_("Backup %s deletion failed."
+ "%s" % (backup_object.backup_id, str(e))))
+ # TODO(Alex): Add it into the notification queue
+ # remove from the backup table
+ backup_object.delete_backup()
+ return False
+
+ # delete all backups forcily regardless of the status
+ def hard_remove_volume_backup(self, backup_object):
+ try:
+
+ # backup = conn.block_storage.get_backup(
+ # project_id=backup_object.project_id, backup_id=backup_object.backup_id,
+ # )
+ backup = conn.get_volume_backup(backup_object.backup_id)
+ if backup == None: return backup_object.delete_backup()
+
+ conn.delete_volume_backup(backup_object.backup_id, force=True)
+ backup_object.delete_backup()
+
+ except OpenstackResourceNotFound:
+ LOG.info(_("Backup %s is not existing in Openstack."
+ "Or cinder-backup is not existing in the cloud." % backup_object.backup_id))
+ # remove from the backup table
+ backup_object.delete_backup()
+
+ except OpenstackSDKException as e:
+ LOG.info(_("Backup %s deletion failed."
+ "%s" % (backup_object.backup_id, str(e))))
+ # TODO(Alex): Add it into the notification queue
+ # remove from the backup table
+ backup_object.delete_backup()
+
def check_instance_volumes(self):
"""Get the list of all the volumes from the project using openstacksdk
Function first list all the servers in the project and get the volumes
@@ -128,6 +199,7 @@
if not self.filter_volume(volume["id"]): continue
queues_map.append(
QueueMapping(
+ project_id=project.id,
volume_id=volume["id"],
backup_id="NULL",
instance_id=server.id,
@@ -160,8 +232,13 @@
backup_id = queue.backup_id
if backup_id == "NULL":
try:
+ LOG.info(_("Backup for volume %s creating" % queue.volume_id))
+ # volume_backup = conn.block_storage.create_backup(
+ # volume_id=queue.volume_id, force=True, project_id=queue.project_id,
+ # )
+ # NOTE(Alex): no need to wait because we have a cycle time out
volume_backup = conn.create_volume_backup(
- volume_id=queue.volume_id, force=True
+ volume_id=queue.volume_id, force=True, wait=False,
)
queue.backup_id = volume_backup.id
queue.backup_status = constants.BACKUP_WIP
@@ -169,6 +246,11 @@
except OpenstackSDKException as error:
LOG.info(_("Backup creation for the volume %s failled. %s"
% (queue.volume_id, str(error))))
+ parsed = parse.parse("Error in creating volume backup {id}", str(error))
+ if parsed == None: return
+ queue.backup_id = parsed["id"]
+ queue.backup_status = constants.BACKUP_WIP
+ queue.save()
else:
pass
# TODO(Alex): remove this task from the task list
@@ -181,15 +263,20 @@
LOG.error("Backup of the volume %s failed." % task.volume_id)
# 2. TODO(Alex): remove failed backup instance from the openstack
# then set the volume status in-use
+ self.hard_cancel_volume_backup(task)
# 3. remove failed task from the task queue
task.delete_queue()
+ def process_non_existing_backup(self, task):
+ task.delete_queue()
+
def process_available_backup(self, task):
LOG.info("Backup of the volume %s is successful." % task.volume_id)
# 1. save success backup in the backup table
self._volume_backup(
BackupMapping(
volume_id=task.volume_id,
+ project_id=task.project_id,
backup_id=task.backup_id,
instance_id=task.instance_id,
backup_completed=1,
@@ -211,10 +298,13 @@
"""
# for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
try:
+ # backup_gen = conn.block_storage.get_backup(
+ # project_id=queue.project_id, backup_id=queue.backup_id,
+ # )
backup_gen = conn.get_volume_backup(queue.backup_id)
if backup_gen == None:
# TODO(Alex): need to check when it is none
- LOG.info(_("Backup status of %s is returning none."%(queue.backup_id)))
+ LOG.info(_("[Beta] Backup status of %s is returning none." % (queue.backup_id)))
return
if backup_gen.status == "error":
self.process_failed_backup(queue)
@@ -229,7 +319,7 @@
else: # "deleting", "restoring", "error_restoring" status
self.process_using_backup(queue)
except OpenstackResourceNotFound as e:
- self.process_failed_backup(queue)
+ self.process_non_existing_backup(queue)
def _volume_backup(self, task):
# matching_backups = [
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 6898348..bfda77d 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -1,5 +1,4 @@
import cotyledon
-import datetime
from futurist import periodics
from oslo_log import log
import staffeln.conf
@@ -59,16 +58,51 @@
return False
# Manage active backup generators
- # TODO(Alex): need to discuss
- # Need to wait until all backups are finished?
- # That is required to make the backup report
def _process_wip_tasks(self):
LOG.info(_("Processing WIP backup generators..."))
- queues_started = backup.Backup().get_queues(
- filters={"backup_status": constants.BACKUP_WIP}
+ # TODO(Alex): Replace this infinite loop with finite time
+ self.cycle_start_time = xtime.get_current_time()
+
+ # loop - take care of backup result while timeout
+ while(1):
+ queues_started = backup.Backup().get_queues(
+ filters={"backup_status": constants.BACKUP_WIP}
+ )
+ if len(queues_started) == 0:
+ LOG.info(_("task queue empty"))
+ break
+ if not self._backup_cycle_timeout():# time in
+ LOG.info(_("cycle timein"))
+ for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
+ else: # time out
+ LOG.info(_("cycle timeout"))
+ for queue in queues_started: backup.Backup().hard_cancel_volume_backup(queue)
+ break
+ time.sleep(constants.BACKUP_RESULT_CHECK_INTERVAL)
+
+ # if the backup cycle timeout, then return True
+ def _backup_cycle_timeout(self):
+ time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.backup_cycle_timout)
+
+ if time_delta_dict == None:
+ LOG.info(_("Recycle timeout format is invalid. "
+ "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."))
+ time_delta_dict = xtime.parse_timedelta_string(constants.DEFAULT_BACKUP_CYCLE_TIMEOUT)
+ rto = xtime.timeago(
+ years=time_delta_dict["years"],
+ months=time_delta_dict["months"],
+ weeks=time_delta_dict["weeks"],
+ days=time_delta_dict["days"],
+ hours=time_delta_dict["hours"],
+ minutes=time_delta_dict["minutes"],
+ seconds=time_delta_dict["seconds"],
)
- if len(queues_started) != 0:
- for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
+ # print(rto.strftime(xtime.DEFAULT_TIME_FORMAT))
+ # print(self.cycle_start_time)
+ # print(self.cycle_start_time - rto)
+ if rto >= self.cycle_start_time:
+ return True
+ return False
# Create backup generators
def _process_todo_tasks(self):
@@ -92,7 +126,6 @@
self.failed_backup_list = []
notify.SendBackupResultEmail(self.success_backup_list, self.failed_backup_list)
-
@periodics.periodic(spacing=CONF.conductor.backup_service_period, run_immediately=True)
def backup_engine(self):
LOG.info("backing... %s" % str(time.time()))
@@ -101,12 +134,10 @@
if self._check_quota(): return
# NOTE(Alex): If _process_wip_tasks() waits tiil no WIP tasks
# exist, no need to repeat this function before and after queue update.
- self._process_wip_tasks()
self._update_task_queue()
self._process_todo_tasks()
self._process_wip_tasks()
- self._report_backup_result()
-
+ # self._report_backup_result()
class RotationManager(cotyledon.Service):
@@ -145,8 +176,7 @@
def remove_backups(self):
print(self.backup_list)
for retention_backup in self.backup_list:
- # 1. check the backup status and delete only available backups
- backup.Backup().remove_volume_backup(retention_backup)
+ backup.Backup().hard_remove_volume_backup(retention_backup)
@periodics.periodic(spacing=CONF.conductor.retention_service_period, run_immediately=True)
def rotation_engine(self):
@@ -160,15 +190,18 @@
# get the threshold time str
def get_threshold_strtime(self):
time_delta_dict = xtime.parse_timedelta_string(CONF.conductor.retention_time)
- if time_delta_dict == None: return None
+ if time_delta_dict == None:
+ LOG.info(_("Retention time format is invalid. "
+ "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."))
+ return None
res = xtime.timeago(
years=time_delta_dict["years"],
months=time_delta_dict["months"],
weeks=time_delta_dict["weeks"],
days=time_delta_dict["days"],
+ hours=time_delta_dict["hours"],
+ minutes=time_delta_dict["minutes"],
+ seconds=time_delta_dict["seconds"],
)
- if res == None: LOG.info(_("Retention time format is invalid. "
- "Follow <YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."))
-
return res.strftime(xtime.DEFAULT_TIME_FORMAT)
diff --git a/staffeln/conductor/notify.py b/staffeln/conductor/notify.py
index d80da4e..2f77aa9 100644
--- a/staffeln/conductor/notify.py
+++ b/staffeln/conductor/notify.py
@@ -54,4 +54,3 @@
LOG.info(_("Backup result email sent"))
except Exception as e:
LOG.error(_("Backup result email send failed. Please check email configuration. %s" % (str(e))))
-
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 4e2aad9..7c9c643 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -1,4 +1,5 @@
from oslo_config import cfg
+from staffeln.common import constants
from staffeln.i18n import _
conductor_group = cfg.OptGroup(
@@ -16,11 +17,18 @@
),
cfg.IntOpt(
"backup_service_period",
- default=60,
+ default=30,
min=10,
help=_("The time of bakup period, the unit is one minute."),
),
cfg.StrOpt(
+ "backup_cycle_timout",
+ regex=r'((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?',
+ default=constants.DEFAULT_BACKUP_CYCLE_TIMEOUT,
+ help=_("The duration while the backup cycle waits backups."
+ "<YEARS>y<MONTHS>mon<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."),
+ ),
+ cfg.StrOpt(
"backup_metadata_key",
default="__automated_backup",
help=_("The key string of metadata the VM, which requres back up, has"),
@@ -53,9 +61,10 @@
),
cfg.StrOpt(
"retention_time",
+ regex=r'((?P<years>\d+?)y)?((?P<months>\d+?)mon)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)min)?((?P<seconds>\d+?)s)?',
default="2w3d",
help=_("The time of retention period, the for mat is "
- "<YEARS>y<MONTHS>m<WEEKS>w<DAYS>d."),
+ "<YEARS>y<MONTHS>mon<WEEKS>w<DAYS>d<HOURS>h<MINUTES>min<SECONDS>s."),
),
]
diff --git a/staffeln/db/sqlalchemy/api.py b/staffeln/db/sqlalchemy/api.py
index 4b4fc9f..0148c3d 100644
--- a/staffeln/db/sqlalchemy/api.py
+++ b/staffeln/db/sqlalchemy/api.py
@@ -110,7 +110,14 @@
filters = {}
- plain_fields = ["volume_id", "backup_id", "backup_completed", "instance_id", "created_at"]
+ plain_fields = [
+ "volume_id",
+ "backup_id",
+ "project_id",
+ "backup_completed",
+ "instance_id",
+ "created_at"
+ ]
return self._add_filters(
query=query,
@@ -126,6 +133,7 @@
plain_fields = [
"backup_id",
+ "project_id",
"volume_id",
"instance_id",
"backup_status",
diff --git a/staffeln/db/sqlalchemy/models.py b/staffeln/db/sqlalchemy/models.py
index 715d747..93dae9f 100644
--- a/staffeln/db/sqlalchemy/models.py
+++ b/staffeln/db/sqlalchemy/models.py
@@ -62,6 +62,7 @@
)
id = Column(Integer, primary_key=True, autoincrement=True)
backup_id = Column(String(100))
+ project_id = Column(String(100))
volume_id = Column(String(100))
instance_id = Column(String(100))
backup_completed = Column(Integer())
@@ -74,6 +75,7 @@
__table_args__ = table_args()
id = Column(Integer, primary_key=True, autoincrement=True)
backup_id = Column(String(100))
+ project_id = Column(String(100))
volume_id = Column(String(100))
backup_status = Column(Integer())
instance_id = Column(String(100))
diff --git a/staffeln/objects/queue.py b/staffeln/objects/queue.py
index ac8c9e0..0137d4e 100644
--- a/staffeln/objects/queue.py
+++ b/staffeln/objects/queue.py
@@ -15,6 +15,7 @@
fields = {
"id": sfeild.IntegerField(),
"backup_id": sfeild.StringField(),
+ "project_id": sfeild.UUIDField(),
"volume_id": sfeild.UUIDField(),
"instance_id": sfeild.StringField(),
"backup_status": sfeild.IntegerField(),
diff --git a/staffeln/objects/volume.py b/staffeln/objects/volume.py
index 6655b7b..ce300af 100644
--- a/staffeln/objects/volume.py
+++ b/staffeln/objects/volume.py
@@ -15,6 +15,7 @@
"id": sfeild.IntegerField(),
"backup_id": sfeild.StringField(),
"instance_id": sfeild.StringField(),
+ "project_id": sfeild.UUIDField(),
"volume_id": sfeild.UUIDField(),
"backup_completed": sfeild.IntegerField(),
}