[WIP]ÑAdd retention service
diff --git a/staffeln/common/time.py b/staffeln/common/time.py
new file mode 100644
index 0000000..6ebdee7
--- /dev/null
+++ b/staffeln/common/time.py
@@ -0,0 +1,48 @@
+import re
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+
+regex = re.compile(
+ r'((?P<years>\d+?)y)?((?P<months>\d+?)m)?((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?'
+)
+
+
+# parse_time parses timedelta string to time dict
+# input: <string> 1y2m3w5d - all values should be integer
+# output: <dict> {year: 1, month: 2, week: 3, day: 5}
+def parse_timedelta_string(time_str):
+ empty_flag = True
+ try:
+ parts = regex.match(time_str)
+ if not parts:
+ return None
+ parts = parts.groupdict()
+ time_params = {}
+ for key in parts:
+ if parts[key]:
+ time_params[key] = int(parts[key])
+ empty_flag = False
+ else:
+ time_params[key] = 0
+ if empty_flag: return None
+ return time_params
+ except:
+ return None
+
+
+def timeago(years, months, weeks, days, from_date=None):
+ if from_date is None:
+ from_date = datetime.now()
+ return from_date - relativedelta(years=years, months=months, weeks=weeks, days=days)
+
+## yearsago using Standard library
+# def yearsago(years, from_date=None):
+# if from_date is None:
+# from_date = datetime.now()
+# try:
+# return from_date.replace(year=from_date.year - years)
+# except ValueError:
+# # Must be 2/29!
+# assert from_date.month == 2 and from_date.day == 29 # can be removed
+# return from_date.replace(month=2, day=28,
+# year=from_date.year-years)
diff --git a/staffeln/conductor/backup.py b/staffeln/conductor/backup.py
index f9cde7b..69c48f4 100755
--- a/staffeln/conductor/backup.py
+++ b/staffeln/conductor/backup.py
@@ -3,11 +3,12 @@
from staffeln.common import constants
from openstack import exceptions
+from openstack.block_storage.v2 import backup
from oslo_log import log
from staffeln.common import auth
from staffeln.common import context
from staffeln import objects
-from staffeln.i18n import _
+from staffeln.common import short_id
CONF = staffeln.conf.CONF
LOG = log.getLogger(__name__)
@@ -28,6 +29,12 @@
return False
return metadata[CONF.conductor.backup_metadata_key].lower() in ["true"]
+
+def backup_volumes_in_project(conn, project_name):
+ # conn.list_servers()
+ pass
+
+
def get_projects_list():
projects = conn.list_projects()
return projects
@@ -42,9 +49,6 @@
self.queue_mapping = dict()
self.volume_mapping = dict()
- def get_backups(self):
- return objects.Volume.list(self.ctx)
-
def get_queues(self, filters=None):
"""Get the list of volume queue columns from the queue_data table"""
queues = objects.Queue.list(self.ctx, filters=filters)
@@ -67,9 +71,7 @@
# Backup the volumes in in-use and available status
def filter_volume(self, volume_id):
volume = conn.get_volume_by_id(volume_id)
- res = volume['status'] in ("available", "in-use")
- if not res:
- LOG.info(_("Volume %s is not backed because it is in %s status" % (volume_id, volume['status'])))
+ return volume['status'] in ("available", "in-use")
def check_instance_volumes(self):
"""Get the list of all the volumes from the project using openstacksdk
@@ -84,13 +86,15 @@
)
for server in servers:
if not self.filter_server(server.metadata): continue
- for volume in server.attached_volumes:
+ server_id = server.id
+ volumes = server.attached_volumes
+ for volume in volumes:
if not self.filter_volume(volume["id"]): continue
queues_map.append(
QueueMapping(
volume_id=volume["id"],
backup_id="NULL",
- instance_id=server.id,
+ instance_id=server_id,
backup_status=constants.BACKUP_PLANNED,
)
)
@@ -123,6 +127,7 @@
volume_backup = conn.block_storage.create_backup(
volume_id=queue.volume_id, force=True
)
+ print(volume_backup)
queue.backup_id = volume_backup.id
queue.backup_status = constants.BACKUP_WIP
queue.save()
@@ -135,15 +140,15 @@
# Reserve for now because it is related to the WIP backup genenrators which
# are not finished in the current cycle
- def process_failed_backup(self, task):
+ def process_failed_task(self, task):
+ LOG.error("Backup of the volume %s failed." % task.id)
# 1. TODO(Alex): notify via email
- LOG.error("Backup of the volume %s failed." % task.volume_id)
# 2. TODO(Alex): remove failed backup instance from the openstack
- # then set the volume status in-use
# 3. remove failed task from the task queue
- task.delete_queue()
+ queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
+ queue_delete.delete_queue()
- def process_available_backup(self, task):
+ def process_success_backup(self, task):
LOG.info("Backup of the volume %s is successful." % task.volume_id)
# 1. save success backup in the backup table
self._volume_backup(
@@ -155,13 +160,10 @@
)
)
# 2. remove from the task list
- task.delete_queue()
+ queue_delete = objects.Queue.get_by_id(self.ctx, task.id)
+ queue_delete.delete_queue()
# 3. TODO(Alex): notify via email
- def process_using_backup(self, task):
- # remove from the task list
- task.delete_queue()
-
def check_volume_backup_status(self, queue):
"""Checks the backup status of the volume
:params: queue: Provide the map of the volume that needs backup
@@ -171,17 +173,15 @@
for backup_gen in conn.block_storage.backups(volume_id=queue.volume_id):
if backup_gen.id == queue.backup_id:
if backup_gen.status == "error":
- self.process_failed_backup(queue)
- elif backup_gen.status == "available":
- self.process_available_backup(queue)
- elif backup_gen.status == "creating":
+ self.process_failed_task(queue)
+ elif backup_gen.status == "success":
+ self.process_success_backup(queue)
+ else:
# TODO(Alex): Need to escalate discussion
# How to proceed WIP bakcup generators?
# To make things worse, the last backup generator is in progress till
# the new backup cycle
LOG.info("Waiting for backup of %s to be completed" % queue.volume_id)
- else: # "deleting", "restoring", "error_restoring" status
- self.process_using_backup(queue)
def _volume_backup(self, task):
# matching_backups = [
diff --git a/staffeln/conductor/manager.py b/staffeln/conductor/manager.py
index 47a3ec3..78869e0 100755
--- a/staffeln/conductor/manager.py
+++ b/staffeln/conductor/manager.py
@@ -2,6 +2,7 @@
from futurist import periodics
from oslo_log import log
import staffeln.conf
+import sys
import threading
import time
@@ -41,60 +42,44 @@
def reload(self):
LOG.info("%s reload" % self.name)
- # Check if the backup count is over the limit
- # TODO(Alex): how to count the backup number
- # only available backups are calculated?
- def _over_limitation(self):
- LOG.info(_("Checking the backup limitation..."))
- max_count = CONF.conductor.max_backup_count
- current_count = len(backup.Backup().get_backups())
- if max_count <= current_count:
- # TODO(Alex): Send notification
- LOG.info(_("The backup limit is over."))
- return True
- LOG.info(_("The max limit is %s, and current backup count is %s" % (max_count, current_count)))
- return False
-
- # Manage active backup generators
- def _process_wip_tasks(self):
- LOG.info(_("Processing WIP backup generators..."))
- queues_started = backup.Backup().get_queues(
+ # return the task(queue) list which are working in progress
+ def get_wip_tasks(self):
+ return backup.Backup().get_queues(
filters={"backup_status": constants.BACKUP_WIP}
)
- if len(queues_started) != 0:
- for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
- # Create backup generators
- def _process_new_tasks(self):
- LOG.info(_("Creating new backup generators..."))
- queues_to_start = backup.Backup().get_queues(
+ # return the task(queue) list which needs to do
+ def get_todo_tasks(self):
+ return backup.Backup().get_queues(
filters={"backup_status": constants.BACKUP_PLANNED}
)
- if len(queues_to_start) != 0:
- for queue in queues_to_start:
- backup.Backup().volume_backup_initiate(queue)
- # Refresh the task queue
- # TODO(Alex): need to escalate discussion
- # how to manage last backups not finished yet
- def _update_task_queue(self):
- LOG.info(_("Updating backup task queue..."))
- all_tasks = backup.Backup().get_queues()
- if len(all_tasks) == 0:
- backup.Backup().create_queue()
- else:
- LOG.info(_("The last backup cycle is not finished yet."
- "So the new backup cycle is skipped."))
+ # return the task(queue) list which needs to do
+ def get_all_tasks(self):
+ return backup.Backup().get_queues()
@periodics.periodic(spacing=CONF.conductor.backup_period, run_immediately=True)
def backup_engine(self):
LOG.info("backing... %s" % str(time.time()))
LOG.info("%s periodics" % self.name)
- if self._over_limitation(): return
- self._update_task_queue()
- self._process_wip_tasks()
- self._process_new_tasks()
+ # TODO(Alex): need to escalate discussion
+ # how to manage last backups not finished yet
+ if len(self.get_all_tasks()) == 0:
+ backup.Backup().create_queue()
+ else:
+ LOG.info(_("The last backup cycle is not finished yet."
+ "So the new backup cycle is skipped."))
+
+ queues_started = self.get_wip_tasks()
+ if len(queues_started) != 0:
+ for queue in queues_started: backup.Backup().check_volume_backup_status(queue)
+
+ queues_to_start = self.get_todo_tasks()
+ print(queues_to_start)
+ if len(queues_to_start) != 0:
+ for queue in queues_to_start:
+ backup.Backup().volume_backup_initiate(queue)
class RotationManager(cotyledon.Service):
@@ -108,6 +93,7 @@
def run(self):
LOG.info("%s run" % self.name)
+ interval = CONF.conductor.rotation_period
periodic_callables = [
(self.rotation_engine, (), {}),
@@ -126,5 +112,5 @@
@periodics.periodic(spacing=CONF.conductor.rotation_period, run_immediately=True)
def rotation_engine(self):
+ print("rotating... %s" % str(time.time()))
LOG.info("%s rotation_engine" % self.name)
-
diff --git a/staffeln/conf/conductor.py b/staffeln/conf/conductor.py
index 06d433b..7d96579 100755
--- a/staffeln/conf/conductor.py
+++ b/staffeln/conf/conductor.py
@@ -26,11 +26,6 @@
default="__automated_backup",
help=_("The key string of metadata the VM, which requres back up, has"),
),
- cfg.IntOpt(
- "max_backup_count",
- default=10,
- help=_("The key string of metadata the VM, which requres back up, has"),
- ),
]
rotation_opts = [
diff --git a/staffeln/objects/__init__.py b/staffeln/objects/__init__.py
index 4f7ca56..8003db1 100755
--- a/staffeln/objects/__init__.py
+++ b/staffeln/objects/__init__.py
@@ -1,5 +1,4 @@
from .queue import Queue
-from .volume import Volume
# from volume import Volume
def register_all():
__import__('staffeln.objects.volume')