diff --git a/appliance/manager.py b/appliance/manager.py index f3a7311..685e367 100644 --- a/appliance/manager.py +++ b/appliance/manager.py @@ -54,7 +54,8 @@ async def create_appliance(self, data): self.logger.error(err) return status, None, err self.logger.info(msg) - ApplianceScheduleExecutor(app.id, 3000).start() + scheduler = self._get_scheduler(app.scheduler) + ApplianceScheduleExecutor(app.id, scheduler, 3000).start() return 201, app, None async def delete_appliance(self, app_id): @@ -72,8 +73,7 @@ async def delete_appliance(self, app_id): if err and status != 404: self.logger.error(err) return 400, None, "Failed to deprovision appliance '%s'"%app_id - scheduler = self._get_scheduler(app.scheduler) - ApplianceDeletionChecker(app_id, scheduler).start() + ApplianceDeletionChecker(app_id).start() return status, msg, None async def save_appliance(self, app, upsert=True): @@ -101,10 +101,10 @@ def _get_scheduler(self, scheduler_name): try: sched_mod = '.'.join(scheduler_name.split('.')[:-1]) sched_class = scheduler_name.split('.')[-1] - return getattr(importlib.import_module(sched_mod), sched_class) + return getattr(importlib.import_module(sched_mod), sched_class)() except Exception as e: self.logger.error(str(e)) - return schedule.DefaultApplianceScheduler + return schedule.local.DefaultApplianceScheduler() class ApplianceAPIManager(APIManager): diff --git a/commons.py b/commons.py index 4d65700..da0c10b 100644 --- a/commons.py +++ b/commons.py @@ -3,7 +3,7 @@ import logging import tornado -from abc import abstractmethod +from abc import ABCMeta, abstractmethod from tornado.httpclient import AsyncHTTPClient, HTTPError from tornado.ioloop import PeriodicCallback from motor.motor_tornado import MotorClient @@ -95,7 +95,7 @@ def __init__(self): self.http_cli = AsyncHttpClientWrapper() -class AutonomousMonitor(Loggable): +class AutonomousMonitor(Loggable, metaclass=ABCMeta): def __init__(self, interval): self.__interval = interval diff --git a/schedule/__init__.py b/schedule/__init__.py index 9563c0f..6406f26 100644 --- a/schedule/__init__.py +++ b/schedule/__init__.py @@ -29,60 +29,69 @@ def add_containers(self, contrs): class GlobalScheduler(Loggable, metaclass=Singleton): async def schedule(self, contr, agents): - # generate a schedule plan - # return the schedule plan - pass - + raise NotImplemented async def reschedule(self, contrs, agents): - # collect evidence for scheduling - # generate a reschedule plan - # return the reschedule plan - pass + raise NotImplemented -class GlobalScheduleExecutor(AutonomousMonitor, metaclass=Singleton): +class GlobalScheduleExecutor(metaclass=Singleton): - def __init__(self, scheduler, interval=5000): - super(GlobalScheduleExecutor, self).__init__(interval) + def __init__(self, scheduler, interval=30000): + super(GlobalScheduleExecutor, self).__init__() + self.__scheduler = scheduler self.__contr_mgr = container.manager.ContainerManager() self.__cluster_mgr = cluster.manager.ClusterManager() - self.__scheduler = scheduler + self.__resched_runner = RescheduleRunner(scheduler, self, interval) - async def schedule(self, contr): - agents = await self.get_agents() - plan = await self.__scheduler.schedule(contr, agents) - for c in plan.containers: - await self._provision_container(c) + def start_rescheduler(self): + self.__resched_runner.start() - async def callback(self): + async def submit(self, contr): agents = await self.get_agents() - contrs = await self._get_containers() - if not contrs: return - plan = await self.__scheduler.reschedule(contrs, agents) + plan = await self.__scheduler.schedule(contr, agents) for c in plan.containers: - await self._provision_container(c) + await self.provision_container(c) async def get_agents(self): return await self.__cluster_mgr.get_cluster(0) - async def _get_containers(self, **kwargs): + async def get_containers(self, **kwargs): status, contrs, err = await self.__contr_mgr.get_containers(**kwargs) if err: self.logger.error(err) return contrs - async def _provision_container(self, contr): + async def provision_container(self, contr): await self.__contr_mgr.save_container(contr) status, contr, err = await self.__contr_mgr.provision_container(contr) if err: self.logger.error(err) +class RescheduleRunner(AutonomousMonitor): + + def __init__(self, scheduler, executor, interval=30000): + super(RescheduleRunner, self).__init__(interval) + self.__scheduler = scheduler + self.__executor = executor + + async def callback(self): + agents = await self.__executor.get_agents() + contrs = await self.__executor.get_containers() + if not contrs: return + plan = await self.__scheduler.reschedule(contrs, agents) + for c in plan.containers: + await self.__executor.provision_container(c) + + + class DefaultGlobalScheduler(GlobalScheduler): async def schedule(self, contr, agents): - pass + self.logger.info('Scheduled by %s'%self.__class__.__name__) + return SchedulePlan(containers=[contr]) async def reschedule(self, contrs, agents): - pass + # self.logger.info('Rescheduled by %s'%self.__class__.__name__) + return SchedulePlan() diff --git a/schedule/local/__init__.py b/schedule/local/__init__.py index cc26e09..749f9cd 100644 --- a/schedule/local/__init__.py +++ b/schedule/local/__init__.py @@ -2,7 +2,7 @@ from abc import ABCMeta -from schedule import GlobalScheduleExecutor, GlobalScheduler, SchedulePlan +from schedule import GlobalScheduleExecutor, SchedulePlan from commons import AutonomousMonitor, Loggable from config import get_global_scheduler @@ -14,7 +14,7 @@ def __init__(self, app_id, scheduler, interval=3000): self.__app_id = app_id self.__app_mgr = appliance.manager.ApplianceManager() self.__local_sched = scheduler - self.__global_sched = GlobalScheduleExecutor(get_global_scheduler()) + self.__global_sched_exec = GlobalScheduleExecutor(get_global_scheduler()) async def callback(self): # get appliance @@ -27,7 +27,7 @@ async def callback(self): self.stop() return # get cluster info - agents = await self.__global_sched.get_agents() + agents = await self.__global_sched_exec.get_agents() # contact the scheduler for new schedule sched = await self.__local_sched.schedule(app, agents) self.logger.debug('Containers to be scheduled: %s'%[c.id for c in sched.containers]) @@ -41,7 +41,7 @@ async def callback(self): async def _execute(self, sched): for c in sched.containers: - await self.__global_sched.schedule(c) + await self.__global_sched_exec.submit(c) self.logger.info('Container %s is being scheduled'%c.id) diff --git a/server.py b/server.py index d7f6715..d31fbf0 100644 --- a/server.py +++ b/server.py @@ -23,7 +23,7 @@ def start_cluster_monitor(): def start_global_scheduler(): scheduler = GlobalScheduleExecutor(get_global_scheduler()) - tornado.ioloop.IOLoop.instance().add_callback(scheduler.start) + tornado.ioloop.IOLoop.instance().add_callback(scheduler.start_rescheduler) def start_server():