From c3f601a5d078170644ffba2251fe7ce4581b324f Mon Sep 17 00:00:00 2001 From: Steve Chan Date: Tue, 13 Apr 2021 15:10:44 -0700 Subject: [PATCH 1/4] Remove references to old non-class based modules. --- app.py | 46 +++++++++++++++++++--------------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/app.py b/app.py index 9cf7552..7009fa7 100644 --- a/app.py +++ b/app.py @@ -10,13 +10,12 @@ from datetime import datetime import json import hashlib -import manage_docker -import manage_rancher +from rancher1_handler import rancher1_handler from typing import Dict, List, Optional import ipaddress import sqlite3 -VERSION = "0.9.11" +VERSION = "0.10.0" # Setup default configuration values, overriden by values from os.environ later cfg = {"docker_url": u"unix://var/run/docker.sock", # path to docker socket @@ -79,6 +78,9 @@ find_stopped_services = None stack_suffix = None +# Instance var for the container manager +container_mgr = None + def merge_env_cfg() -> None: """ @@ -200,33 +202,23 @@ def setup_app(app: flask.Flask) -> None: global naming_regex global find_stopped_services global stack_suffix + global container_mgr try: if (cfg["rancher_url"] is not None): cfg['mode'] = "rancher" - manage_rancher.setup(cfg, logger) - manage_rancher.verify_config(cfg) - check_session = manage_rancher.check_session - start = manage_rancher.start - find_image = manage_rancher.find_image - find_service = manage_rancher.find_service - find_narratives = manage_rancher.find_narratives - find_narrative_labels = manage_rancher.find_narrative_labels - reap_narrative = manage_rancher.reap_narrative + container_mgr = rancher1_handler(cfg, logger) + container_mgr.verify_config(cfg) + check_session = container_mgr.check_session + start = container_mgr.start + find_image = container_mgr.find_image + find_service = container_mgr.find_service + find_narratives = container_mgr.find_narratives + find_narrative_labels = container_mgr.find_narrative_labels + reap_narrative = container_mgr.reap_narrative naming_regex = "^{}_" - find_stopped_services = manage_rancher.find_stopped_services - stack_suffix = manage_rancher.stack_suffix - else: - cfg['mode'] = "docker" - manage_docker.setup(cfg, logger) - manage_docker.verify_config(cfg) - start = manage_docker.start - find_image = manage_docker.find_image - find_service = manage_docker.find_service - find_narratives = manage_docker.find_narratives - find_narrative_labels = manage_docker.find_narrative_labels - reap_narrative = manage_docker.reap_narrative - naming_regex = "^{}$" + find_stopped_services = container_mgr.find_stopped_services + stack_suffix = container_mgr.stack_suffix except Exception as ex: logger.critical("Failed validation of docker or rancher configuration") raise(ex) @@ -266,7 +258,7 @@ def get_prespawned() -> List[str]: """ if cfg["mode"] != "rancher": raise(NotImplementedError("prespawning only supports rancher mode, current mode={}".format(cfg['mode']))) - narratives = manage_rancher.find_narratives() + narratives = find_narratives() idle_narr = [narr for narr in narratives if cfg['container_name_prespawn'].format("") in narr] return(idle_narr) @@ -286,7 +278,7 @@ def prespawn_narrative(num: int) -> None: session = random.getrandbits(128).to_bytes(16, "big").hex() narr_id = session[0:6] try: - manage_rancher.start(session, narr_id, True) + start(session, narr_id, True) except Exception as err: logger.critical({"message": "prespawn_narrative_exception", "session": session, "container": "{} of {}".format(a, num), "exception": repr(err)}) From f96ef6ac2278cb87655e01086f62d6b91762d14c Mon Sep 17 00:00:00 2001 From: Steve Chan Date: Tue, 27 Apr 2021 14:50:16 -0700 Subject: [PATCH 2/4] Add missing flake8 configs. --- tox.ini | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 tox.ini diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..4c7e9aa --- /dev/null +++ b/tox.ini @@ -0,0 +1,15 @@ +[flake8] +max-line-length = 100 +exclude = + lib/biokbase, + submodules, + */prepare_deploy_cfg.py, + */NarrativeRunner_server_test.py, + test_scripts + temp_test* +putty-ignore = + */__init__.py : F401,E126 + *Impl.py : E265,E266 + *Server.py : E265,E266 + *Client.py : E265,E266 + \ No newline at end of file From 47260a6395df8360444a22f3d7bdbef65d4ce38b Mon Sep 17 00:00:00 2001 From: Steve Chan Date: Fri, 10 Sep 2021 10:54:53 -0700 Subject: [PATCH 3/4] Fix annoying line length warnings. --- app.py | 70 +++++++++++++++++++++++++++++++++++---------------------- tox.ini | 2 +- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/app.py b/app.py index 24d6082..9627df1 100644 --- a/app.py +++ b/app.py @@ -22,14 +22,15 @@ "hostname": [u"localhost"], # hostname used for traefik router rules "auth2": u"https://ci.kbase.us/services/auth/api/V2/me", # url for authenticating tokens "image_name": u"kbase/narrative", # image name (omit :tag !) used for spawning narratives - "image_tag": None, # image tag; if not None, will be concatenated to image_name for new instances (otherwise latest_narr_version() will be used) + "image_tag": None, # image tag; if not None, will be concatenated to image_name + # for new instances, otherwise latest_narr_version() will be used "session_cookie": u"narrative_session", # name of cookie used for storing session id "kbase_cookie": u"kbase_session", # name of the cookie container kbase auth token "container_name": u"narrative-{}", # python string template for narrative name, userid in param - "container_name_prespawn": u"narrativepre-{}", # python string template for pre-spawned narratives, userid in param + "container_name_prespawn": u"narrativepre-{}", # string template for pre-spawned narratives, userid in param "narrative_version_url": "https://ci.kbase.us/narrative_version", # url to narrative_version endpoint - "traefik_metrics": "http://traefik:8080/metrics", # URL of traefik metrics endpoint, api + prometheus must be enabled - "dock_net": u"narrative-traefiker_default", # name of the docker network that docker containers should be bound to + "traefik_metrics": "http://traefik:8080/metrics", # URL of traefik metrics endpoint, enable api + prometheus! + "dock_net": u"narrative-traefiker_default", # docker network that docker containers should be bound to "log_level": logging.DEBUG, # loglevel - DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50 "log_dest": None, # log destination - currently unused "log_name": u"traefiker", # python logger name @@ -37,16 +38,18 @@ "rancher_password": None, # password for rancher creds "rancher_url": None, # URL for the rancher API endpoint, including version "rancher_meta": "http://rancher-metadata/", # URL for the rancher-metadata service (unauthenticated) - "rancher_env_url": None, # rancher enviroment URL (under rancher_url) - self-configured if not set - "rancher_stack_id": None, # rancher stack ID value, used with rancher_env_url - self-configured if not set - "rancher_stack_name": None, # rancher stack name value, used with rancher_env_url - self-configured if not - # set, required if rancher_stack_id set + # The next 3 configs are self-configured if not set + "rancher_env_url": None, # rancher enviroment URL (under rancher_url) + "rancher_stack_id": None, # rancher stack ID value, used with rancher_env_url + "rancher_stack_name": None, # rancher stack name value, used with rancher_env_url + # required if rancher_stack_id set "mode": None, # What orchestation type? "rancher" or "docker" "reaper_timeout_secs": 600, # How long should a container be idle before it gets reaped? "reaper_ipnetwork": u"127.0.0.1/32", # What IP address/network is allowed to access /reaper/ ? "debug": 0, # Set debug mode - "narrenv": dict(), # Dictionary of env name/val to be passed to narratives at startup - "num_prespawn": 5, # How many prespawned narratives should be maintained? Checked at startup and reapee runs + "narrenv": dict(), # Dict of env name/val to be passed to narratives at startup + "num_prespawn": 5, # How many prespawned narratives should be maintained? + # Checked at startup and reapee runs "status_role": "KBASE_ADMIN", # auth custom role for full narratve_status privs "sqlite_reaperdb_path": "/tmp/reaper.db", # full path to SQLite3 database file "COMMIT_SHA": "not available"} # Git commit hash for this build, set via docker build env @@ -236,14 +239,18 @@ def setup_app(app: flask.Flask) -> None: else: suffix = "" narr_time = {narr+suffix: time.time() for narr in narrs if narr.startswith(prefix)} - logger.debug({"message": "Adding containers matching {} to narr_activity".format(prefix), "names": str(list(narr_time.keys()))}) + logger.debug({"message": "Adding containers matching {} to narr_activity".format(prefix), + "names": str(list(narr_time.keys()))}) narr_activity.update(narr_time) logger.info({'message': "using sqlite3 database in {}".format(cfg['sqlite_reaperdb_path'])}) if (cfg['image_tag'] is not None and cfg['image_tag'] != ''): - logger.info({'message': "image_tag specified, will use image {}:{}".format(cfg['image_name'],cfg['image_tag'])}) + logger.info({'message': "image_tag specified, will use image {}:{}".format(cfg['image_name'], + cfg['image_tag'])}) else: - logger.info({'message': "no image_tag specified, will use version from URL {} for image name {}".format(cfg['narrative_version_url'],cfg['image_name'])}) + msg = "no image_tag specified, will use version from URL {} for image name {}" + msg = msg.format(cfg['narrative_version_url'], cfg['image_name']) + logger.info({'message': msg}) try: # need this because we are not in a flask request context here with app.app_context(): @@ -419,7 +426,7 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]: if r.status_code == 200: body = r.text.split("\n") # Find all counters related to websockets - jupyter notebooks rely on websockets for communications - websock_re = re.compile('traefik_service_open_connections\{\S+protocol="websocket",service=\"(\S+)@.+ (\d+)') # noqa: W605 + websock_re = re.compile('traefik_service_open_connections\{\S+protocol="websocket",service=\"(\S+)@.+ (\d+)') # noqa: W605 E501 # Containers is a dictionary keyed on container name with the value as the # of active web sockets containers = dict() for line in body: @@ -428,7 +435,8 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]: logger.debug({"message": "websocket line: {}".format(line)}) containers[match.group(1)] = int(match.group(2)) prefix = cfg['container_name'].format('') - logger.debug({"message": "Looking for containers that with name prefix {} and image name {}".format(prefix, cfg['image_name'])}) + msg = "Looking for containers that with name prefix {} and image name {}".format(prefix, cfg['image_name']) + logger.debug({"message": msg}) # query for all of services in the environment that match cfg['image_name'] as the image name all_narr_containers = find_narratives(cfg['image_name']) # filter it down to only containers with the proper prefix @@ -441,7 +449,7 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]: suffix = "" for name in containers.keys(): logger.debug({"message": "Examining traefik metrics entry: {}".format(name)}) - # Skip any containers that don't match the container prefix, to avoid wasting time on the wrong containers + # Skip any containers that don't match the container prefix, avoid wasting time on the wrong containers if name.startswith(prefix): logger.debug({"message": "Matches prefix {}".format(prefix)}) # services from traefik has a suffix which needs to be stripped to match @@ -458,17 +466,20 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]: if (containers[name] > 0) or (name not in narr_activity): narr_activity[name] = time.time() logger.debug({"message": "Updated timestamp for {}".format(name)}) - # Delete this entry from dictionary so that we can identify any 'leftovers' not in traefik metrics + # Delete this entry from dict to identify any 'leftovers' not in traefik metrics try: narr_containers.remove(service_name) except ValueError: logger.errors({"message": "Error deleting {} from narr_containers".format(service_name)}) else: - logger.debug({"message": "Skipping because {} did not match running services".format(service_name)}) + msg = "Skipping because {} did not match running services".format(service_name) + logger.debug({"message": msg}) else: logger.debug({"message": "Skipped {} because it didn't match prefix {}".format(name, prefix)}) if len(narr_containers) > 0: - logger.debug({"message": ": {} rancher services matched prefix and image, but not in traefik metrics.".format(len(narr_containers))}) + msg = ": {} rancher services matched prefix and image, but not in traefik metrics." + msg = msg.format(len(narr_containers)) + logger.debug({"message": msg}) for name in narr_containers: full_name = name+suffix if full_name not in narr_activity: @@ -533,9 +544,9 @@ def reap_older_prespawn(version: str) -> None: def reaper() -> int: """ - Reaper function, originally intended to be called at regular intervals specified by cfg['reaper_sleep_secs']. Now being - called by /reaper/ endpoint, returning number of narratives reaped - Updates last seen timestamps for narratives, reaps any that have been idle for longer than cfg['reaper_timeout_secs'] + Called by /reaper/ endpoint, returning number of narratives reaped + Updates last seen timestamps for narratives, reaps any that have been idle for + longer than cfg['reaper_timeout_secs'] """ global narr_last_version @@ -570,7 +581,8 @@ def reaper() -> int: del narr_activity[name] reaped += 1 except Exception as e: - logger.critical({"message": "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e))}) + msg = "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e)) + logger.critical({"message": msg}) # Save narr_activity back to the database try: @@ -664,7 +676,8 @@ def narrative_shutdown(username=None): break resp = flask.Response("Service {} deleted".format(name), 200) except Exception as e: - logger.critical({"message": "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e))}) + msg = "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e)) + logger.critical({"message": msg}) resp = flask.Response("Error deleteing service {}: {}".format(name, repr(e)), 200) else: resp = flask.Response('Valid kbase authentication token required', 401) @@ -682,7 +695,8 @@ def narrative_services() -> List[dict]: try: narr_activity = get_narr_activity_from_db() except Exception as e: - logger.critical({"message": "Could not get data from database for narrative_status, faking last_seen: {}".format(repr(e))}) + msg = "Could not get data from database for narrative_status, faking last_seen: {}".format(repr(e)) + logger.critical({"message": msg}) narr_activity = None try: @@ -715,7 +729,8 @@ def narrative_services() -> List[dict]: info['last_ip'] = match.group(1) info['created'] = match.group(2) except Exception as ex: - logger.critical({"message": "Error: Unhandled exception while trying to query service {}: {}".format(name, repr(ex))}) + msg = "Error: Unhandled exception while trying to query service {}: {}".format(name, repr(ex)) + logger.critical({"message": msg}) info['session_key'] = "Error querying api" info['image'] = None info['publicEndpoints'] = None @@ -809,5 +824,6 @@ def hello(narrative): if cfg['mode'] is not None: app.run() else: - logger.critical({"message": "No container management configuration. Please set docker_url or rancher_* environment variable appropriately"}) + msg = "No container management config. Please set docker_url or rancher_* environment variable appropriately" + logger.critical({"message": msg}) raise RuntimeError("Cannot start/check containers.") diff --git a/tox.ini b/tox.ini index 4c7e9aa..74790a7 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [flake8] -max-line-length = 100 +max-line-length = 120 exclude = lib/biokbase, submodules, From 7c0b97950a83aea606c21a3cee7c3051d2976ce9 Mon Sep 17 00:00:00 2001 From: Steve Chan Date: Tue, 14 Sep 2021 15:09:15 -0700 Subject: [PATCH 4/4] Trim the files per Boris' suggestion --- manage_docker.py | 147 ------------- manage_rancher.py | 544 ---------------------------------------------- 2 files changed, 691 deletions(-) delete mode 100644 manage_docker.py delete mode 100644 manage_rancher.py diff --git a/manage_docker.py b/manage_docker.py deleted file mode 100644 index 4493f9b..0000000 --- a/manage_docker.py +++ /dev/null @@ -1,147 +0,0 @@ -import docker -import os -import logging -from typing import Dict, List, Optional - - -# Module wide docker client -client: Optional[docker.DockerClient] = None - -# Module wide logger -logger: Optional[logging.Logger] = None - -# Module wide config -cfg = {"docker_url": u"unix://var/run/docker.sock", - "hostname": u"localhost", - "image": u"kbase/narrative:latest", - "es_type": "narrative-traefiker", - "session_cookie": u"narrative_session", - "container_name": u"narrative-{}", - "dock_net": u"narrative-traefiker_default", - "reload_secs": 5, - "log_level": logging.DEBUG, - "log_dest": None, - "log_name": u"traefiker"} - - -def setup(main_cfg: dict, main_logger: logging.Logger) -> None: - global cfg - if main_cfg is not None: - cfg = main_cfg - else: - for cfg_item in cfg.keys(): - if cfg_item in os.environ: - cfg[cfg_item] = os.environ[cfg_item] - global logger - if main_logger is None: - logger = logging.getLogger() - else: - logger = main_logger - - global client - client = docker.DockerClient(base_url=cfg['docker_url']) - - -def find_narratives() -> List[str]: - """ - This query hits the docker api looking for containers that are running cfg['image'], this should - be any container running narratives - ToDo: Implement actual code - """ - return([]) - - -def find_service(service_name: str) -> dict: - """ - Given a service name, return the JSON service object from docker of that name. Throw an exception - if (exactly) one isn't found. - ToDo: Implement more than the stub - """ - return({}) - -def verify_config(cfg: dict) -> None: - """ Quickly test the docker socket, if it fails, rethrow the exception after some explanatory logging """ - try: - client.containers.list() - except Exception as ex: - logger.critical("Error trying to list containers using {} as docker socket path.".format(cfg['docker_url'])) - raise(ex) - - -def find_image(name: str) -> str: - """ - Given a service name, return the docker image that the service is running. If the service doesn't exist - then raise and exception - """ - try: - container = client.containers.get(name) - except docker.errors.NotFound: - if cfg['debug']: - print("Service {} not found (might be part of core stack or reaped already)".format(name)) - return(None) - return(container.image.attrs["RepoTags"]) - - -def reap_narrative(container_name: str) -> None: - try: - killit = client.containers.get(container_name) - killit.stop() - except docker.errors.NotFound: - print("Container not found - may have been reaped already") - except Exception as e: - raise(e) # Unhandled exception, rethrow - return - - -def check_session(userid: str) -> str: - """ - Check to see if we already have a container for this user by trying to pull the container object - for the userid - """ - try: - name = cfg['container_name'].format(userid) - container = client.containers.get(name) - session_id = container.labels['session_id'] - except docker.errors.NotFound: - session_id = None - except docker.errors.APIErrors as err: - msg = "Docker APIError thrown while searching for container name {} : {}".format(name, str(err)) - logger.error({"message": msg, "container_name": name, "exception": str(err)}) - session_id = None - return(session_id) - - -def start(session: str, userid: str) -> Dict[str, str]: - """ - Attempts to start a docker container. Takes the suggested session id and a username - Returns the final session id ( in case there was a race condition and another session was already started). - Will throw an exception if there was any issue starting the container other than the race condition we're - already trying to handle - """ - labels = dict() - labels["traefik.enable"] = u"True" - labels["session_id"] = session - cookie = u"{}={}".format(cfg['session_cookie'], session) - labels["traefik.http.routers." + userid + ".rule"] = u"Host(\"" + cfg['hostname'] + u"\") && PathPrefix(\"/narrative/\")" - labels["traefik.http.routers." + userid + ".rule"] += u" && HeadersRegexp(\"Cookie\",\"" + cookie + u"\")" - labels["traefik.http.routers." + userid + ".entrypoints"] = u"web" - # Attempt to bring up a container, if there is an unrecoverable error, clear the session variable to flag - # an error state, and overwrite the response with an error response - try: - name = cfg['container_name'].format(userid) - container = client.containers.run(cfg['image'], detach=True, labels=labels, hostname=name, - auto_remove=True, name=name, network=cfg["dock_net"]) - logger.info({"message": "new_container", "image": cfg['image'], "userid": userid, "container_name": name, - "session_id": session}) - except docker.errors.APIError as err: - # If there is a race condition because a container has already started, then this should catch it. - # Try to get the session for it, if that fails then bail with error message - session = check_session(userid) - if session is None: - raise(err) - else: - logger.info({"message": "previous_session", "userid": userid, "container_name": name, "session_id": session}) - container = client.get_container(name) - if container.status != u"created": - raise(Exception("Error starting container: container status {}".format(container.status))) - return({"session": session}) diff --git a/manage_rancher.py b/manage_rancher.py deleted file mode 100644 index 52525d6..0000000 --- a/manage_rancher.py +++ /dev/null @@ -1,544 +0,0 @@ -import requests -import os -import logging -import re -import random -import flask -from datetime import datetime -from typing import Dict, List, Optional -# to do: move latest_narr_version() so we don't need to import app -import app - -# Module wide logger -logger: Optional[logging.Logger] = None - -# Setup default configuration values, overriden by values from os.environ later -cfg = {"hostname": u"localhost", - "auth2": u"https://ci.kbase.us/services/auth/api/V2/token", - "image_name": u"kbase/narrative", - "image_tag": None, - "es_type": "narrative-traefiker", - "session_cookie": u"narrative_session", - "container_name": u"narrative-{}", - "container_name_prespawn": u"narrative_pre-{}", - "reload_secs": 5, - "log_level": logging.DEBUG, - "log_dest": None, - "log_name": u"traefiker", - "rancher_user": None, - "rancher_password": None, - "rancher_url": None, - "rancher_meta": "http://rancher-metadata/", - "rancher_env_url": None, - "rancher_stack_id": None, - "rancher_stack_name": None, - "mode": None, - "narrenv": dict()} - - -def setup(main_cfg: dict, main_logger: logging.Logger) -> None: - global cfg - global logger - - if main_logger is None: - logger = logging.getLogger() - else: - logger = main_logger - - if main_cfg is not None: - cfg = main_cfg - else: - # We pull any environment variable that matches a config key into the config dictionary - for cfg_item in cfg.keys(): - if cfg_item in os.environ: - cfg[cfg_item] = os.environ[cfg_item] - # To support injecting arbitrary environment variables into the narrative container, we - # look for any environment variable with the prefix "NARRENV_" and add it into a narrenv - # dictionary in the the config hash, using the env variable name stripped of "NARRENV_" - # prefix as the key - for k in os.environ.keys(): - match = re.match(r"^NARRENV_(\w+)", k) - if match: - cfg['narrenv'][match.group(1)] = os.environ[k] - logger.debug({"message": "Setting narrenv from environment", - "key": match.group(1), "value": os.environ[k]}) - - -def check_session(userid: str) -> str: - """ - Check to see if we already have a container for this user by trying to pull the container object - for the userid - """ - try: - name = cfg['container_name'].format(userid) - url = "{}/service?name={}".format(cfg["rancher_env_url"], name) - r = requests.get(url, auth=(cfg["rancher_user"], cfg["rancher_password"])) - if not r.ok: - msg = "Error response code from rancher API while searching for container name {} : {}".format(name, r.status_code) - logger.error({"message": msg, "status_code": r.status_code, "service_name": name, "response_body": r.text}) - raise(Exception(msg)) - res = r.json() - svcs = res['data'] - if len(svcs) == 0: - logger.debug({"message": "No previous session found", "service_name": name, "userid": userid}) - session_id = None - else: - session_id = svcs[0]['launchConfig']['labels']['session_id'] - logger.debug({"message": "Found existing session", "session_id": session_id, "userid": userid}) - if len(svcs) > 1: - uuids = [svc['uuid'] for svc in svcs] - logger.warning({"message": "Found multiple session matches against container name", "userid": userid, - "service_name": name, "rancher_uuids": uuids}) - except Exception as ex: - logger.debug({"message": "Error trying to find existing session", "exception": format(str(ex)), "userid": userid}) - raise(ex) - return(session_id) - - -def start(session: str, userid: str, prespawn: Optional[bool] = False) -> Dict[str, str]: - """ - wrapper around the start_new function that checks to see if there are waiting narratives that - can be assigned. Note that this method is subject to race conditions by competing workers, so we - have 5 retries, and try to select a random waiting narrative before just spawning a new one. Someday maybe - we can implement something to serialize selecting narratives for assignment, but that's a ToDo item. - """ - if prespawn is True: - start_new(session, userid, True) - else: - prespawned = find_prespawned() - # The number of prespawned should be pretty stable around cfg['num_prespawn'], but during a - # usage there might be spike that exhausts the pool of ready containers before replacements - # are available. - if len(prespawned) > 0: - # if we're not already over the num)prespawn setting then - # spawn a replacement and immediately rename an existing container to match the - # userid. We are replicating the prespawn container name code here, maybe cause - # issues later on if the naming scheme is changed! - if len(prespawned) <= cfg['num_prespawn']: - start_new(session, session[0:6], True) - narr_name = cfg['container_name'].format(userid) - offset = random.randint(0, len(prespawned)-1) - session = None - # Try max(5, # of prespawned) times to use an existing narrative, on success assign the session and break - for attempt in range(max(5, len(prespawned))): - candidate = prespawned[(offset+attempt) % len(prespawned)] - try: - rename_narrative(candidate, narr_name) - container = find_service(narr_name) - session = container['launchConfig']['labels']['session_id'] - logger.info({"message": "assigned_container", "userid": userid, "service_name": narr_name, "session_id": session, - "client_ip": "127.0.0.1", "attempt": attempt, "status": "success"}) - break - except Exception as ex: - logger.info({"message": "assigned_container_fail", "userid": userid, "service_name": narr_name, "session_id": session, - "client_ip": "127.0.0.1", "attempt": attempt, "status": "fail", "error": str(ex)}) - if session: - return({"session": session, "prespawned": True}) - else: - # Well, that was a bust, just spin up one explicitly for this user. Maybe we hit a race condition where all of the - # cached containers have been assigned between when we queried and when we tried to rename it. - # ToDo: need to write a pool watcher thread that wakes up periodically to make sure the number of prespawned - # narratives are still at the desired level. Shouldn't be needed since there should be a 1:1 between assigning - # and spawning replacements, but errors happen - logger.debug({"message": "could not assign prespawned container, calling start_new", "userid": userid, "session_id": session}) - return({"session": start_new(session, userid, False)}) - else: - return({"session": start_new(session, userid, False)}) - - -def start_new(session: str, userid: str, prespawn: Optional[bool] = False): - """ - Attempts to start a new container using the rancher API. Signature is identical to the start_docker - method, with the equivalent rancher exceptions. - """ - # Crazy long config needed for rancher container startup. Based on observing the traffic from rancher - # GUI to rancher REST APIs. Might be able to prune it down with some research - container_config = {u'assignServiceIpAddress': False, - u'createIndex': None, - u'created': None, - u'description': None, - u'externalId': None, - u'fqdn': None, - u'healthState': None, - u'kind': None, - u'launchConfig': { - u'blkioWeight': None, - u'capAdd': [], - u'capDrop': ["MKNOD", "NET_RAW", "SYS_CHROOT", "SETUID", "SETGID", "CHOWN", "SYS_ADMIN", - "DAC_OVERRIDE", "FOWNER", "FSETID", "SETPCAP", "AUDIT_WRITE", "SETFCAP"], - u'cgroupParent': None, - u'count': None, - u'cpuCount': None, - u'cpuPercent': None, - u'cpuPeriod': None, - u'cpuQuota': None, - u'cpuRealtimePeriod': None, - u'cpuRealtimeRuntime': None, - u'cpuSet': None, - u'cpuSetMems': None, - u'cpuShares': None, - u'createIndex': None, - u'created': None, - u'dataVolumes': [], - u'dataVolumesFrom': [], - u'dataVolumesFromLaunchConfigs': [], - u'deploymentUnitUuid': None, - u'description': None, - u'devices': [], - u'diskQuota': None, - u'dns': [], - u'dnsSearch': [], - u'domainName': None, - u'drainTimeoutMs': 0, - u'environment': { - u'env1': u'val1', - u'env2': u'val2'}, - u'externalId': None, - u'firstRunning': None, - u'healthInterval': None, - u'healthRetries': None, - u'healthState': None, - u'healthTimeout': None, - u'hostname': None, - u'imageUuid': u'docker:kbase/narrative:latest', - u'instanceTriggeredStop': u'stop', - u'ioMaximumBandwidth': None, - u'ioMaximumIOps': None, - u'ip': None, - u'ip6': None, - u'ipcMode': None, - u'isolation': None, - u'kernelMemory': None, - u'kind': u'container', - u'labels': { - u'io.rancher.container.pull_image': u'always', - u'session_id': None, - u'traefik.enable': u'True'}, - u'logConfig': {u'config': {}, u'driver': u''}, - u'memory': None, - u'memoryMb': None, - u'memoryReservation': None, - u'memorySwap': None, - u'memorySwappiness': None, - u'milliCpuReservation': None, - u'networkLaunchConfig': None, - u'networkMode': u'managed', - u'oomScoreAdj': None, - u'pidMode': None, - u'pidsLimit': None, - u'ports': [u'8888/tcp'], - u'privileged': False, - u'publishAllPorts': False, - u'readOnly': False, - u'removed': None, - u'requestedIpAddress': None, - u'restartPolicy': {u'name': u'always'}, - u'runInit': False, - u'secrets': [], - u'shmSize': None, - u'startCount': None, - u'startOnCreate': True, - u'stdinOpen': True, - u'stopSignal': None, - u'stopTimeout': None, - u'tty': True, - u'type': u'launchConfig', - u'user': None, - u'userdata': None, - u'usernsMode': None, - u'uts': None, - u'uuid': None, - u'vcpu': 1, - u'volumeDriver': None, - u'workingDir': None}, - u'name': None, - u'removed': None, - u'scale': 1, - u'secondaryLaunchConfigs': [], - u'selectorContainer': None, - u'selectorLink': None, - u'stackId': None, - u'startOnCreate': True, - u'system': False, - u'type': u'service', - u'uuid': None, - u'vip': None} - if prespawn is False: - name = cfg['container_name'].format(userid) - client_ip = flask.request.headers.get("X-Real-Ip", flask.request.headers.get("X-Forwarded-For", None)) - try: # Set client ip from request object if available - container_config['description'] = 'client-ip:{} timestamp:{}'.format(client_ip, - datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')) - except Exception: - logger.error({"message": "Error checking flask.request.headers for X-Real-Ip or X-Forwarded-For"}) - else: - name = cfg['container_name_prespawn'].format(userid) - client_ip = None - cookie = u'{}'.format(session) - labels = dict() - labels["io.rancher.container.pull_image"] = u"always" - labels["io.rancher.container.start_once"] = u"true" - labels["traefik.enable"] = u"True" - labels["session_id"] = session - # create a rule for list of hostnames that should match from cfg['hostname'] - host_rules = " || ".join([u"Host(\"{}\")".format(hostname) for hostname in cfg['hostname']]) - remaining_rule = u" && PathPrefix(\"{}\") && HeadersRegexp(\"Cookie\",`{}`)" - labels["traefik.http.routers." + userid + ".rule"] = host_rules + remaining_rule.format("/narrative/", cookie) - labels["traefik.http.routers." + userid + ".entrypoints"] = u"web" - container_config['launchConfig']['labels'] = labels - container_config['launchConfig']['name'] = name - if (cfg['image_tag'] is not None and cfg['image_tag'] != ''): - imageUuid = "{}:{}".format(cfg['image_name'], cfg['image_tag']) - else: - # to do: fix calling latest_narr_version() so we don't need to call the `app` method like this - imageUuid = "{}:{}".format(cfg['image_name'], app.latest_narr_version()) - container_config['launchConfig']['imageUuid'] = "docker:{}".format(imageUuid) - container_config['launchConfig']['environment'].update(cfg['narrenv']) - container_config['name'] = name - container_config['stackId'] = cfg['rancher_stack_id'] - - # Attempt to bring up a container, if there is an unrecoverable error, clear the session variable to flag - # an error state, and overwrite the response with an error response - try: - r = requests.post(cfg["rancher_env_url"]+"/service", json=container_config, auth=(cfg["rancher_user"], cfg["rancher_password"])) - logger.info({"message": "new_container", "image": imageUuid, "userid": userid, "service_name": name, "session_id": session, - "client_ip": client_ip}) # request.remote_addr) - if not r.ok: - msg = "Error - response code {} while creating new narrative rancher service: {}".format(r.status_code, r.text) - logger.error({"message": msg}) - raise(Exception(msg)) - except Exception as ex: - raise(ex) - return(session) - - -def find_stack() -> Dict[str, str]: - """ - Query the rancher-metadata service for the name of the stack we're running in, and then - go to the rancher_url and walk down through the stacks in the rancher environments we - have access to that find the the endpoint that matches the name - """ - r = requests.get(cfg['rancher_meta']+"2016-07-29/self/stack/environment_name") - env_name = r.text - logger.info("Found environment name: {}".format(env_name)) - r = requests.get(cfg['rancher_meta']+"2016-07-29/self/stack/name") - stack_name = r.text - logger.info("Found stack name: {}".format(stack_name)) -# set this in info instead, to set all rancher vars in verify_config -# cfg['rancher_stack_name'] = stack_name - url = cfg['rancher_url']+"projects" - logger.info("Querying {} with supplied credentials".format(url)) - r = requests.get(url, auth=(cfg['rancher_user'], cfg['rancher_password'])) - if not r.ok: - msg = "Error querying {}: {} {}".format(url, r.status_code, r.text) - logger.error(msg) - raise IOError(msg) - - resp = r.json() - x = [env['links']['self'] for env in resp['data'] if env['name'].lower() == env_name.lower()] - env_endpoint = x[0] - logger.info("Found environment endpoint: {}".format(env_endpoint)) - r = requests.get(env_endpoint+"/stacks", auth=(cfg['rancher_user'], cfg['rancher_password'])) - resp = r.json() - x = [stack['id'] for stack in resp['data'] if stack['name'].lower() == stack_name.lower()] - logger.info("Found stack id: {}".format(x[0])) - return({"url": env_endpoint, "stack_id": x[0], "stack_name": stack_name}) - - -def stack_suffix() -> str: - """ - Returns the stack suffix that traefik appends to service names. - """ - return("_{}".format(cfg['rancher_stack_name'])) - - -def find_service(traefikname: str) -> dict: - """ - Given a service name, return the JSON service object from Rancher of that name. Throw an exception - if (exactly) one isn't found. - """ - suffix = stack_suffix() - name = traefikname.replace(suffix, "") # Remove trailing _traefik suffix that traefik adds - url = "{}/service?name={}".format(cfg['rancher_env_url'], name) - r = requests.get(url, auth=(cfg['rancher_user'], cfg['rancher_password'])) - if r.ok: - results = r.json() - if len(results['data']) == 0: - # Assume that the container has already been reaped and ignore - return(None) - else: - res = results['data'][0] - if len(results['data']) > 1: - # If we have more than 1 result, then something is broken. Delete all but the newest image and - # return that one - logger.error({"message": "There can be only one...container with a name match. Deleting all but the first entry"}) - for svc in results['data'][1:]: - remove_url = svc['actions']['remove'] - r = requests.delete(remove_url, auth=(cfg['rancher_user'], cfg['rancher_password'])) - if r.ok: - logger.info({"message": "Removed duplicate narrative {} {}".format(svc['id'], svc['name'])}) - else: - raise(Exception("Problem duplicate narrative {} {} .: response code {}: {}".format(svc['id'], svc['name'], r.status_code, r.text))) - return(res) - - -def find_stopped_services() -> dict: - """ - Query rancher for services with the state "healthState=started-once" and return the names of matching services - Result can be an empty dictionary - """ - url = "{}/service?healthState=started-once".format(cfg['rancher_env_url']) - r = requests.get(url, auth=(cfg['rancher_user'], cfg['rancher_password'])) - if r.ok: - results = r.json() - names = {svc['name']: svc for svc in results['data']} - return(names) - else: - raise(Exception("Error querying for stopped services: Response code {}".format(r.status_code))) - - -def find_image(name: str) -> str: - """ - Given a service name, return the docker image that the service is running. If the service doesn't exist - then return None, as this may mean that service has been reaped already - """ - try: - container = find_service(name) - if container is not None: - src, image = container["launchConfig"]["imageUuid"].split(":", 1) - else: - logger.info("Could not find_service named {}".format(name)) - image = None - return(image) - except Exception as ex: # Just reraise any other exception - raise(ex) - - -def reap_narrative(name: str) -> None: - res = find_service(name) - # if there is a None return, the image may have been reaped already just return - if res is None: - return - remove_url = res['actions']['remove'] - r = requests.delete(remove_url, auth=(cfg['rancher_user'], cfg['rancher_password'])) - if r.ok: - return - else: - raise(Exception("Problem reaping narrative {}: response code {}: {}".format(name, r.status_code, r.text))) - - -def rename_narrative(name1: str, name2: str) -> None: - res = find_service(name1) - # if there is a None return, the image may have been reaped already just return - if res is None: - return - put_url = res['links']['self'] - # Object with updated values for the service - data = {"name": name2} - # On a rename, the request object should always exist, but just in case - client_ip = flask.request.headers.get("X-Real-Ip", flask.request.headers.get("X-Forwarded-For", None)) - data['description'] = 'client-ip:{} timestamp:{}'.format(client_ip, - datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')) - r = requests.put(put_url, auth=(cfg['rancher_user'], cfg['rancher_password']), data=data) - if r.ok: - return - else: - raise(Exception("Problem renaming narrative {} to {}: response code {}: {}".format(name1, name2, r.status_code, r.text))) - - -def find_prespawned() -> List[str]: - """ returns a list of the prespawned narratives waiting to be assigned """ - narratives = find_narratives() - idle_narr = [narr for narr in narratives if cfg['container_name_prespawn'].format("") in narr] - return(idle_narr) - - -def find_narratives(image_name: Optional[str] = None) -> List[str]: - """ - This query hits the endpoint for the stack (cfg['rancher_stack_id']), and returns a list of all the - names of services that are have an imageUuid with a match for "docker:"+image_name. If no parameter is - given (the original function signature), then the default value of cfg['image'] is used. - """ - if image_name is None: - image_name = cfg['image_name'] - query_params = {'limit': 1000} - url = "{}/stacks/{}/services".format(cfg['rancher_env_url'], cfg['rancher_stack_id']) - r = requests.get(url, auth=(cfg['rancher_user'], cfg['rancher_password']), params=query_params) - imageUuid = "docker:{}".format(image_name) - logger.debug({"message": "querying rancher for services matching {}".format(imageUuid)}) - - if not r.ok: - raise(Exception("Error querying for services at {}: Response code {}: {}".format(url, - r.status_code, r.body))) - results = r.json() - svcs = results['data'] - svc_names = [svc['name'] for svc in svcs if svc['launchConfig']['imageUuid'].startswith(imageUuid)] - return(svc_names) - - -def find_narrative_labels(svc_list: list) -> dict: - """ - Takes a list of narrative servicenames and return a dictionary keyed on servicename that - contains the label information for each service. - """ - label_dict = dict() - for svc in svc_list: - try: - svc_obj = find_service(svc) - url = svc_obj['links']['instances'] - r = requests.get(url, auth=(cfg['rancher_user'], cfg['rancher_password'])) - # The instance should be there because the reaper shouldn't delete a container during this - # functions run, throw an error - if not r.ok: - raise(Exception("Error querying for instance at {}: Response code {}: {}".format(url, - r.status_code, r.body))) - results = r.json() - label_dict[svc] = results['data'][0]['labels'] - except Exception as ex: - logger.critical("Error querying rancher instance info for {}: {}".format(svc, str(ex))) - return(label_dict) - - -def verify_config(cfg2: dict) -> None: - """ - Check that we can access the rancher api, then make sure that the endpoints for the environment and the stack_id are good. - If we have the rancher_url endpoint, but nothing else, try to figure it out using the rancher-metadata endpoint - """ - cfg.update(cfg2) - if (cfg['rancher_url'] is None): - logger.critical("rancher_url is not set, cannot operate in rancher mode") - raise(ValueError("rancher_url configuration not set")) - if (cfg['rancher_user'] is None) or (cfg['rancher_password'] is None): - logger.warning("rancher_user and/or rancher_password not set") - try: - r = requests.get(cfg['rancher_url'], auth=(cfg['rancher_user'], cfg['rancher_password'])) - if (not r.ok): - logger.critical("Error while contacting rancher_url with rancher_user and rancher_password: {}:{}".format(r.status_code, r.text)) - raise(ValueError("Cannot contact rancher service using provided configuration")) - except Exception as ex: - logger.critical("Error trying to connect to {}: {}".format(cfg['rancher_url'], str(ex))) - raise(ex) - if (cfg['rancher_stack_id'] is None or cfg['rancher_env_url'] is None or cfg['rancher_stack_name'] is None): - logger.info("rancher_stack_id, rancher_stack_name, or rancher_env_url not set - introspecting rancher-metadata service") - try: - info = find_stack() - cfg['rancher_stack_id'] = info['stack_id'] - cfg['rancher_stack_name'] = info['stack_name'] - cfg['rancher_env_url'] = info['url'] - if cfg['rancher_stack_id'] is None or cfg['rancher_env_url'] is None: - logger.critical("Failed to determine rancher_stack_id and/or rancher_env_url from metadata service") - raise(ValueError("rancher_stack_id or rancher_env_url not set")) - except Exception as ex: - logger.critical("Could not query rancher_meta({}) service: {}".format(cfg['rancher_meta'], str(ex))) - raise(ex) - # Make sure we can query the rancher environment endpoint - try: - r = requests.get(cfg['rancher_env_url'], auth=(cfg['rancher_user'], cfg['rancher_password'])) - if (not r.ok): - logger.critical("Error response from rancher_env_url {}:{}".format(r.status_code, r.text)) - raise(ValueError("Error from rancher environment endpoint")) - except Exception as ex: - logger.critical("Error trying to connect to {}: {}".format(cfg['rancher_env_url'], str(ex))) - raise(ex) - # Everything should be good at this point - return