Skip to content

Commit

Permalink
WIP on #30 and #39; Changes related to #19
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Jun 13, 2018
1 parent 8dff450 commit 7d664ff
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 123 deletions.
16 changes: 15 additions & 1 deletion appliance/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ def parse(cls, data):
app = Appliance(data['id'], containers)
return 200, app, None

def __init__(self, id, containers=[], **kwargs):
def __init__(self, id, containers=[],
scheduler='schedule.local.DefaultApplianceScheduler', **kwargs):
self.__id = id
self.__containers = list(containers)
self.__scheduler = scheduler

@property
@swagger.property
Expand Down Expand Up @@ -75,6 +77,18 @@ def containers(self):
"""
return self.__containers

@property
@swagger.property
def scheduler(self):
"""
Appliance-level scheduler for the appliance
---
type: str
"""
return self.__scheduler

@containers.setter
def containers(self, contrs):
self.__containers = list(contrs)
Expand Down
20 changes: 16 additions & 4 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import importlib
import schedule

from config import config
from commons import MongoClient, AutonomousMonitor
from commons import Manager, APIManager
from appliance.base import Appliance
from container.manager import ContainerManager
from schedule import ApplianceScheduleNegotiator
from schedule.local import ApplianceScheduleExecutor


class ApplianceManager(Manager):
Expand All @@ -18,7 +21,7 @@ async def get_appliance(self, app_id):
if status != 200:
return status, app, err
app = Appliance(**app)
status, app.containers, err = await self.__contr_mgr.get_containers(app_id)
status, app.containers, err = await self.__contr_mgr.get_containers(appliance=app_id)
if not app.containers:
self.logger.info("Empty appliance '%s', deleting"%app_id)
status, msg, err = await self.delete_appliance(app_id)
Expand Down Expand Up @@ -51,7 +54,7 @@ async def create_appliance(self, data):
self.logger.error(err)
return status, None, err
self.logger.info(msg)
ApplianceScheduleNegotiator(app.id, 3000).start()
ApplianceScheduleExecutor(app.id, 3000).start()
return 201, app, None

async def delete_appliance(self, app_id):
Expand All @@ -69,7 +72,8 @@ async def delete_appliance(self, app_id):
if err and status != 404:
self.logger.error(err)
return 400, None, "Failed to deprovision appliance '%s'"%app_id
ApplianceDeletionChecker(app_id).start()
scheduler = self._get_scheduler(app.scheduler)
ApplianceDeletionChecker(app_id, scheduler).start()
return status, msg, None

async def save_appliance(self, app, upsert=True):
Expand All @@ -93,6 +97,14 @@ def validate_dependencies(contrs):

return validate_dependencies(app.containers)

def _get_scheduler(self, scheduler_name):
try:
sched_mod = '.'.join(scheduler_name.split('.')[:-1])
sched_class = scheduler_name.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)
except Exception as e:
self.logger.error(str(e))
return schedule.DefaultApplianceScheduler


class ApplianceAPIManager(APIManager):
Expand Down
4 changes: 2 additions & 2 deletions commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import tornado

from abc import ABCMeta, abstractmethod
from abc import abstractmethod
from tornado.httpclient import AsyncHTTPClient, HTTPError
from tornado.ioloop import PeriodicCallback
from motor.motor_tornado import MotorClient
Expand Down Expand Up @@ -95,7 +95,7 @@ def __init__(self):
self.http_cli = AsyncHttpClientWrapper()


class AutonomousMonitor(Loggable, metaclass=ABCMeta):
class AutonomousMonitor(Loggable):

def __init__(self, interval):
self.__interval = interval
Expand Down
15 changes: 13 additions & 2 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import yaml
import importlib

from util import dirname

Expand Down Expand Up @@ -70,8 +71,7 @@ def __init__(self, port=0, *args, **kwargs):
class GeneralConfig:

def __init__(self, master, port=9090, n_parallel=1,
scheduler='scheduler.appliance.DefaultApplianceScheduler',
ha=False, *args, **kwargs):
scheduler='schedule.DefaultGlobalScheduler', ha=False, *args, **kwargs):
self.__master = master
self.__port = port
self.__n_parallel = n_parallel
Expand Down Expand Up @@ -188,3 +188,14 @@ def irods(self):


config = Configuration.read_config('%s/config.yml'%dirname(__file__))


def get_global_scheduler():
try:
sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1])
sched_class = config.pivot.scheduler.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)()
except Exception as e:
sys.stderr.write(str(e) + '\n')
from schedule import DefaultGlobalScheduler
return DefaultGlobalScheduler()
1 change: 0 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pivot:
master: m1.dcos
port: 9090
n_parallel: 8
scheduler: schedule.plugin.location_aware.LocationAwareApplianceScheduler
ha: false
db:
host: localhost
Expand Down
6 changes: 4 additions & 2 deletions container/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ async def get(self, app_id):
application/json:
schema: Error
"""
status, services, err = await self.__contr_mgr.get_containers(app_id, type='service')
status, services, err = await self.__contr_mgr.get_containers(appliance=app_id,
type='service')
self.set_status(status)
self.write(json.dumps([s.to_render() for s in services]
if status == 200 else error(err)))
Expand Down Expand Up @@ -76,7 +77,8 @@ async def get(self, app_id):
application/json:
schema: Error
"""
status, services, err = await self.__contr_mgr.get_containers(app_id, type='job')
status, services, err = await self.__contr_mgr.get_containers(appliance=app_id,
type='job')
self.set_status(status)
self.write(json.dumps([s.to_render() for s in services]
if status == 200 else error(err)))
Expand Down
24 changes: 13 additions & 11 deletions container/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,53 @@

class ContainerManager(Manager):

def __init__(self, contr_info_ttl=timedelta(seconds=3)):
def __init__(self):
self.__service_api = ServiceAPIManager()
self.__job_api = JobAPIManager()
self.__contr_db = ContainerDBManager()
self.__cluster_db = AgentDBManager()
self.__contr_info_ttl = contr_info_ttl

async def get_container(self, app_id, contr_id):
async def get_container(self, app_id, contr_id, ttl=0):
status, contr, err = await self.__contr_db.get_container(app_id, contr_id)
if status == 404:
return status, contr, err
if not contr.last_update or \
datetime.datetime.now(tz=None) - contr.last_update > self.__contr_info_ttl:
datetime.datetime.now(tz=None) - contr.last_update > timedelta(seconds=ttl):
status, contr, err = await self._get_updated_container(contr)
if status == 404 and contr.state != ContainerState.SUBMITTED:
self.logger.info("Deleted ghost container: %s"%contr)
await self.__contr_db.delete_container(contr)
return 404, None, err
if status == 200:
contr.last_update = datetime.datetime.now(tz=None)
await self.__contr_db.save_container(contr, False)
await self.save_container(contr)
elif status != 404:
self.logger.error("Failed to update container '%s'"%contr)
self.logger.error(err)
return 200, contr, None

async def get_containers(self, app_id, **kwargs):
contrs = await self.__contr_db.get_containers(appliance=app_id, **kwargs)
async def get_containers(self, ttl=0, **kwargs):
contrs = await self.__contr_db.get_containers(**kwargs)
contrs_to_del, contrs_to_update = [], [],
cur_time = datetime.datetime.now(tz=None)
for c in contrs:
if c.last_update and cur_time - c.last_update <= self.__contr_info_ttl:
if c.last_update and cur_time - c.last_update <= timedelta(seconds=ttl):
continue
status, c, err = await self._get_updated_container(c)
if status == 404 and c.state != ContainerState.SUBMITTED:
contrs_to_del.append(c)
if status == 200:
contrs_to_update.append(c)
if contrs_to_del:
filters = dict(id={'$in': [c.id for c in contrs_to_del]}, appliance=app_id)
filters = dict(id={'$in': [c.id for c in contrs_to_del]})
status, msg, err = await self.__contr_db.delete_containers(**filters)
if err:
self.logger.error(err)
else:
self.logger.info(msg)
for c in contrs_to_update:
c.last_update = datetime.datetime.now(tz=None)
await self.__contr_db.save_container(c, upsert=False)
await self.save_container(c)
return 200, contrs, None

async def create_container(self, data):
Expand All @@ -69,7 +68,7 @@ async def create_container(self, data):
status, contr, err = Container.parse(data)
if err:
return status, None, err
await self.__contr_db.save_container(contr, True)
await self.save_container(contr, True)
return 201, contr, None

async def delete_container(self, app_id, contr_id):
Expand Down Expand Up @@ -121,6 +120,9 @@ async def provision_container(self, contr):
return status, None, err
return status, contr, None

async def save_container(self, contr, upsert=False):
await self.__contr_db.save_container(contr, upsert=upsert)

async def _get_updated_container(self, contr):
assert isinstance(contr, Container)
self.logger.debug('Update container info: %s'%contr)
Expand Down
130 changes: 60 additions & 70 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,10 @@
import sys
import importlib

import container
import cluster
import appliance

from abc import ABCMeta

from commons import AutonomousMonitor, Singleton, Loggable
from container.manager import ContainerManager
from config import config


def get_scheduler():
try:
sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1])
sched_class = config.pivot.scheduler.split('.')[-1]
return getattr(importlib.import_module(sched_mod), sched_class)
except Exception as e:
sys.stderr.write(str(e) + '\n')
from schedule.default import DefaultApplianceScheduler
return DefaultApplianceScheduler


class ApplianceScheduleNegotiator(AutonomousMonitor):

def __init__(self, app_id, interval=3000):
super(ApplianceScheduleNegotiator, self).__init__(interval)
self.__app_id = app_id
self.__executor = ApplianceScheduleExecutor()
self.__scheduler = get_scheduler()()
self.__cluster_mgr = cluster.manager.ClusterManager()
self.__app_mgr = appliance.manager.ApplianceManager()

async def callback(self):
# get appliance
status, app, err = await self.__app_mgr.get_appliance(self.__app_id)
if not app:
if status == 404:
self.logger.info('Appliance %s no longer exists'%self.__app_id)
else:
self.logger.error(err)
self.stop()
return
# get cluster info
agents = await self.__cluster_mgr.get_cluster(ttl=0)
# contact the scheduler for new schedule
sched = await self.__scheduler.schedule(app, agents)
self.logger.debug('Containers to be scheduled: %s'%[c.id for c in sched.containers])
# if the scheduling is done
if sched.done:
self.logger.info('Scheduling is done for appliance %s'%self.__app_id)
self.stop()
return
# execute the new schedule
await self.__executor.execute(sched)


class ApplianceScheduleExecutor(Loggable, metaclass=Singleton):

def __init__(self):
self.__contr_mgr = ContainerManager()

async def execute(self, sched):
for c in sched.containers:
_, msg, err = await self.__contr_mgr.provision_container(c)
if err:
self.logger.error(err)
self.logger.info('Container %s is being provisioned'%c.id)


class Schedule:
class SchedulePlan:

def __init__(self, done=False, containers=[]):
self.__done = done
Expand All @@ -91,8 +26,63 @@ def add_containers(self, contrs):
self.__containers += list(contrs)


class ApplianceScheduler(Loggable, metaclass=ABCMeta):
class GlobalScheduler(Loggable, metaclass=Singleton):

async def schedule(self, contr, agents):
# generate a schedule plan
# return the schedule plan
pass


async def reschedule(self, contrs, agents):
# collect evidence for scheduling
# generate a reschedule plan
# return the reschedule plan
pass


class GlobalScheduleExecutor(AutonomousMonitor, metaclass=Singleton):

def __init__(self, scheduler, interval=5000):
super(GlobalScheduleExecutor, self).__init__(interval)
self.__contr_mgr = container.manager.ContainerManager()
self.__cluster_mgr = cluster.manager.ClusterManager()
self.__scheduler = scheduler

async def schedule(self, contr):
agents = await self.get_agents()
plan = await self.__scheduler.schedule(contr, agents)
for c in plan.containers:
await self._provision_container(c)

async def callback(self):
agents = await self.get_agents()
contrs = await self._get_containers()
if not contrs: return
plan = await self.__scheduler.reschedule(contrs, agents)
for c in plan.containers:
await self._provision_container(c)

async def get_agents(self):
return await self.__cluster_mgr.get_cluster(0)

async def _get_containers(self, **kwargs):
status, contrs, err = await self.__contr_mgr.get_containers(**kwargs)
if err:
self.logger.error(err)
return contrs

async def _provision_container(self, contr):
await self.__contr_mgr.save_container(contr)
status, contr, err = await self.__contr_mgr.provision_container(contr)
if err:
self.logger.error(err)


class DefaultGlobalScheduler(GlobalScheduler):

async def schedule(self, app, agents):
raise NotImplemented
async def schedule(self, contr, agents):
pass

async def reschedule(self, contrs, agents):
pass
Loading

0 comments on commit 7d664ff

Please sign in to comment.