Skip to content

Commit

Permalink
[#30][#39] bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Jun 13, 2018
1 parent 7d664ff commit 9c2194a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 38 deletions.
10 changes: 5 additions & 5 deletions appliance/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ async def create_appliance(self, data):
self.logger.error(err)
return status, None, err
self.logger.info(msg)
ApplianceScheduleExecutor(app.id, 3000).start()
scheduler = self._get_scheduler(app.scheduler)
ApplianceScheduleExecutor(app.id, scheduler, 3000).start()
return 201, app, None

async def delete_appliance(self, app_id):
Expand All @@ -72,8 +73,7 @@ async def delete_appliance(self, app_id):
if err and status != 404:
self.logger.error(err)
return 400, None, "Failed to deprovision appliance '%s'"%app_id
scheduler = self._get_scheduler(app.scheduler)
ApplianceDeletionChecker(app_id, scheduler).start()
ApplianceDeletionChecker(app_id).start()
return status, msg, None

async def save_appliance(self, app, upsert=True):
Expand Down Expand Up @@ -101,10 +101,10 @@ def _get_scheduler(self, scheduler_name):
try:
sched_mod = '.'.join(scheduler_name.split('.')[:-1])
sched_class = scheduler_name.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)
return getattr(importlib.import_module(sched_mod), sched_class)()
except Exception as e:
self.logger.error(str(e))
return schedule.DefaultApplianceScheduler
return schedule.local.DefaultApplianceScheduler()


class ApplianceAPIManager(APIManager):
Expand Down
4 changes: 2 additions & 2 deletions commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import tornado

from abc import abstractmethod
from abc import ABCMeta, abstractmethod
from tornado.httpclient import AsyncHTTPClient, HTTPError
from tornado.ioloop import PeriodicCallback
from motor.motor_tornado import MotorClient
Expand Down Expand Up @@ -95,7 +95,7 @@ def __init__(self):
self.http_cli = AsyncHttpClientWrapper()


class AutonomousMonitor(Loggable):
class AutonomousMonitor(Loggable, metaclass=ABCMeta):

def __init__(self, interval):
self.__interval = interval
Expand Down
61 changes: 35 additions & 26 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,60 +29,69 @@ def add_containers(self, contrs):
class GlobalScheduler(Loggable, metaclass=Singleton):

async def schedule(self, contr, agents):
# generate a schedule plan
# return the schedule plan
pass

raise NotImplemented

async def reschedule(self, contrs, agents):
# collect evidence for scheduling
# generate a reschedule plan
# return the reschedule plan
pass
raise NotImplemented


class GlobalScheduleExecutor(AutonomousMonitor, metaclass=Singleton):
class GlobalScheduleExecutor(metaclass=Singleton):

def __init__(self, scheduler, interval=5000):
super(GlobalScheduleExecutor, self).__init__(interval)
def __init__(self, scheduler, interval=30000):
super(GlobalScheduleExecutor, self).__init__()
self.__scheduler = scheduler
self.__contr_mgr = container.manager.ContainerManager()
self.__cluster_mgr = cluster.manager.ClusterManager()
self.__scheduler = scheduler
self.__resched_runner = RescheduleRunner(scheduler, self, interval)

async def schedule(self, contr):
agents = await self.get_agents()
plan = await self.__scheduler.schedule(contr, agents)
for c in plan.containers:
await self._provision_container(c)
def start_rescheduler(self):
self.__resched_runner.start()

async def callback(self):
async def submit(self, contr):
agents = await self.get_agents()
contrs = await self._get_containers()
if not contrs: return
plan = await self.__scheduler.reschedule(contrs, agents)
plan = await self.__scheduler.schedule(contr, agents)
for c in plan.containers:
await self._provision_container(c)
await self.provision_container(c)

async def get_agents(self):
return await self.__cluster_mgr.get_cluster(0)

async def _get_containers(self, **kwargs):
async def get_containers(self, **kwargs):
status, contrs, err = await self.__contr_mgr.get_containers(**kwargs)
if err:
self.logger.error(err)
return contrs

async def _provision_container(self, contr):
async def provision_container(self, contr):
await self.__contr_mgr.save_container(contr)
status, contr, err = await self.__contr_mgr.provision_container(contr)
if err:
self.logger.error(err)


class RescheduleRunner(AutonomousMonitor):

def __init__(self, scheduler, executor, interval=30000):
super(RescheduleRunner, self).__init__(interval)
self.__scheduler = scheduler
self.__executor = executor

async def callback(self):
agents = await self.__executor.get_agents()
contrs = await self.__executor.get_containers()
if not contrs: return
plan = await self.__scheduler.reschedule(contrs, agents)
for c in plan.containers:
await self.__executor.provision_container(c)



class DefaultGlobalScheduler(GlobalScheduler):

async def schedule(self, contr, agents):
pass
self.logger.info('Scheduled by %s'%self.__class__.__name__)
return SchedulePlan(containers=[contr])

async def reschedule(self, contrs, agents):
pass
# self.logger.info('Rescheduled by %s'%self.__class__.__name__)
return SchedulePlan()
8 changes: 4 additions & 4 deletions schedule/local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from abc import ABCMeta

from schedule import GlobalScheduleExecutor, GlobalScheduler, SchedulePlan
from schedule import GlobalScheduleExecutor, SchedulePlan
from commons import AutonomousMonitor, Loggable
from config import get_global_scheduler

Expand All @@ -14,7 +14,7 @@ def __init__(self, app_id, scheduler, interval=3000):
self.__app_id = app_id
self.__app_mgr = appliance.manager.ApplianceManager()
self.__local_sched = scheduler
self.__global_sched = GlobalScheduleExecutor(get_global_scheduler())
self.__global_sched_exec = GlobalScheduleExecutor(get_global_scheduler())

async def callback(self):
# get appliance
Expand All @@ -27,7 +27,7 @@ async def callback(self):
self.stop()
return
# get cluster info
agents = await self.__global_sched.get_agents()
agents = await self.__global_sched_exec.get_agents()
# contact the scheduler for new schedule
sched = await self.__local_sched.schedule(app, agents)
self.logger.debug('Containers to be scheduled: %s'%[c.id for c in sched.containers])
Expand All @@ -41,7 +41,7 @@ async def callback(self):

async def _execute(self, sched):
for c in sched.containers:
await self.__global_sched.schedule(c)
await self.__global_sched_exec.submit(c)
self.logger.info('Container %s is being scheduled'%c.id)


Expand Down
2 changes: 1 addition & 1 deletion server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def start_cluster_monitor():

def start_global_scheduler():
scheduler = GlobalScheduleExecutor(get_global_scheduler())
tornado.ioloop.IOLoop.instance().add_callback(scheduler.start)
tornado.ioloop.IOLoop.instance().add_callback(scheduler.start_rescheduler)


def start_server():
Expand Down

0 comments on commit 9c2194a

Please sign in to comment.