Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEVOPS-419 WIP: Migrate narrative-traefiker to using classes instead of modules for orchestration handlers #105

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 61 additions & 53 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from datetime import datetime
import json
import hashlib
import manage_docker
import manage_rancher
from rancher1_handler import rancher1_handler
sychan marked this conversation as resolved.
Show resolved Hide resolved
from typing import Dict, List, Optional
import ipaddress
import sqlite3
Expand All @@ -23,31 +22,34 @@
"hostname": [u"localhost"], # hostname used for traefik router rules
"auth2": u"https://ci.kbase.us/services/auth/api/V2/me", # url for authenticating tokens
"image_name": u"kbase/narrative", # image name (omit :tag !) used for spawning narratives
"image_tag": None, # image tag; if not None, will be concatenated to image_name for new instances (otherwise latest_narr_version() will be used)
"image_tag": None, # image tag; if not None, will be concatenated to image_name
# for new instances, otherwise latest_narr_version() will be used
"session_cookie": u"narrative_session", # name of cookie used for storing session id
"kbase_cookie": u"kbase_session", # name of the cookie container kbase auth token
"container_name": u"narrative-{}", # python string template for narrative name, userid in param
"container_name_prespawn": u"narrativepre-{}", # python string template for pre-spawned narratives, userid in param
"container_name_prespawn": u"narrativepre-{}", # string template for pre-spawned narratives, userid in param
"narrative_version_url": "https://ci.kbase.us/narrative_version", # url to narrative_version endpoint
"traefik_metrics": "http://traefik:8080/metrics", # URL of traefik metrics endpoint, api + prometheus must be enabled
"dock_net": u"narrative-traefiker_default", # name of the docker network that docker containers should be bound to
"traefik_metrics": "http://traefik:8080/metrics", # URL of traefik metrics endpoint, enable api + prometheus!
"dock_net": u"narrative-traefiker_default", # docker network that docker containers should be bound to
"log_level": logging.DEBUG, # loglevel - DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50
"log_dest": None, # log destination - currently unused
"log_name": u"traefiker", # python logger name
"rancher_user": None, # username for rancher creds
"rancher_password": None, # password for rancher creds
"rancher_url": None, # URL for the rancher API endpoint, including version
"rancher_meta": "http://rancher-metadata/", # URL for the rancher-metadata service (unauthenticated)
"rancher_env_url": None, # rancher enviroment URL (under rancher_url) - self-configured if not set
"rancher_stack_id": None, # rancher stack ID value, used with rancher_env_url - self-configured if not set
"rancher_stack_name": None, # rancher stack name value, used with rancher_env_url - self-configured if not
# set, required if rancher_stack_id set
# The next 3 configs are self-configured if not set
"rancher_env_url": None, # rancher enviroment URL (under rancher_url)
"rancher_stack_id": None, # rancher stack ID value, used with rancher_env_url
"rancher_stack_name": None, # rancher stack name value, used with rancher_env_url
# required if rancher_stack_id set
"mode": None, # What orchestation type? "rancher" or "docker"
"reaper_timeout_secs": 600, # How long should a container be idle before it gets reaped?
"reaper_ipnetwork": u"127.0.0.1/32", # What IP address/network is allowed to access /reaper/ ?
"debug": 0, # Set debug mode
"narrenv": dict(), # Dictionary of env name/val to be passed to narratives at startup
"num_prespawn": 5, # How many prespawned narratives should be maintained? Checked at startup and reapee runs
"narrenv": dict(), # Dict of env name/val to be passed to narratives at startup
"num_prespawn": 5, # How many prespawned narratives should be maintained?
# Checked at startup and reapee runs
"status_role": "KBASE_ADMIN", # auth custom role for full narratve_status privs
"sqlite_reaperdb_path": "/tmp/reaper.db", # full path to SQLite3 database file
"COMMIT_SHA": "not available"} # Git commit hash for this build, set via docker build env
Expand Down Expand Up @@ -79,6 +81,9 @@
find_stopped_services = None
stack_suffix = None

# Instance var for the container manager
container_mgr = None


def merge_env_cfg() -> None:
"""
Expand Down Expand Up @@ -200,33 +205,23 @@ def setup_app(app: flask.Flask) -> None:
global naming_regex
global find_stopped_services
global stack_suffix
global container_mgr

try:
if (cfg["rancher_url"] is not None):
cfg['mode'] = "rancher"
manage_rancher.setup(cfg, logger)
manage_rancher.verify_config(cfg)
check_session = manage_rancher.check_session
start = manage_rancher.start
find_image = manage_rancher.find_image
find_service = manage_rancher.find_service
find_narratives = manage_rancher.find_narratives
find_narrative_labels = manage_rancher.find_narrative_labels
reap_narrative = manage_rancher.reap_narrative
container_mgr = rancher1_handler(cfg, logger)
container_mgr.verify_config(cfg)
check_session = container_mgr.check_session
start = container_mgr.start
find_image = container_mgr.find_image
find_service = container_mgr.find_service
find_narratives = container_mgr.find_narratives
find_narrative_labels = container_mgr.find_narrative_labels
reap_narrative = container_mgr.reap_narrative
naming_regex = "^{}_"
find_stopped_services = manage_rancher.find_stopped_services
stack_suffix = manage_rancher.stack_suffix
else:
cfg['mode'] = "docker"
manage_docker.setup(cfg, logger)
manage_docker.verify_config(cfg)
start = manage_docker.start
find_image = manage_docker.find_image
find_service = manage_docker.find_service
find_narratives = manage_docker.find_narratives
find_narrative_labels = manage_docker.find_narrative_labels
reap_narrative = manage_docker.reap_narrative
naming_regex = "^{}$"
find_stopped_services = container_mgr.find_stopped_services
stack_suffix = container_mgr.stack_suffix
except Exception as ex:
logger.critical("Failed validation of docker or rancher configuration")
raise(ex)
Expand All @@ -244,14 +239,18 @@ def setup_app(app: flask.Flask) -> None:
else:
suffix = ""
narr_time = {narr+suffix: time.time() for narr in narrs if narr.startswith(prefix)}
logger.debug({"message": "Adding containers matching {} to narr_activity".format(prefix), "names": str(list(narr_time.keys()))})
logger.debug({"message": "Adding containers matching {} to narr_activity".format(prefix),
"names": str(list(narr_time.keys()))})
narr_activity.update(narr_time)

logger.info({'message': "using sqlite3 database in {}".format(cfg['sqlite_reaperdb_path'])})
if (cfg['image_tag'] is not None and cfg['image_tag'] != ''):
logger.info({'message': "image_tag specified, will use image {}:{}".format(cfg['image_name'],cfg['image_tag'])})
logger.info({'message': "image_tag specified, will use image {}:{}".format(cfg['image_name'],
cfg['image_tag'])})
else:
logger.info({'message': "no image_tag specified, will use version from URL {} for image name {}".format(cfg['narrative_version_url'],cfg['image_name'])})
msg = "no image_tag specified, will use version from URL {} for image name {}"
msg = msg.format(cfg['narrative_version_url'], cfg['image_name'])
logger.info({'message': msg})
try:
# need this because we are not in a flask request context here
with app.app_context():
Expand All @@ -270,7 +269,7 @@ def get_prespawned() -> List[str]:
"""
if cfg["mode"] != "rancher":
raise(NotImplementedError("prespawning only supports rancher mode, current mode={}".format(cfg['mode'])))
narratives = manage_rancher.find_narratives()
narratives = find_narratives()
idle_narr = [narr for narr in narratives if cfg['container_name_prespawn'].format("") in narr]
return(idle_narr)

Expand All @@ -290,7 +289,7 @@ def prespawn_narrative(num: int) -> None:
session = random.getrandbits(128).to_bytes(16, "big").hex()
narr_id = session[0:6]
try:
manage_rancher.start(session, narr_id, True)
start(session, narr_id, True)
except Exception as err:
logger.critical({"message": "prespawn_narrative_exception", "session": session,
"container": "{} of {}".format(a, num), "exception": repr(err)})
Expand Down Expand Up @@ -427,7 +426,7 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]:
if r.status_code == 200:
body = r.text.split("\n")
# Find all counters related to websockets - jupyter notebooks rely on websockets for communications
websock_re = re.compile('traefik_service_open_connections\{\S+protocol="websocket",service=\"(\S+)@.+ (\d+)') # noqa: W605
websock_re = re.compile('traefik_service_open_connections\{\S+protocol="websocket",service=\"(\S+)@.+ (\d+)') # noqa: W605 E501
# Containers is a dictionary keyed on container name with the value as the # of active web sockets
containers = dict()
for line in body:
Expand All @@ -436,7 +435,8 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]:
logger.debug({"message": "websocket line: {}".format(line)})
containers[match.group(1)] = int(match.group(2))
prefix = cfg['container_name'].format('')
logger.debug({"message": "Looking for containers that with name prefix {} and image name {}".format(prefix, cfg['image_name'])})
msg = "Looking for containers that with name prefix {} and image name {}".format(prefix, cfg['image_name'])
logger.debug({"message": msg})
# query for all of services in the environment that match cfg['image_name'] as the image name
all_narr_containers = find_narratives(cfg['image_name'])
# filter it down to only containers with the proper prefix
Expand All @@ -449,7 +449,7 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]:
suffix = ""
for name in containers.keys():
logger.debug({"message": "Examining traefik metrics entry: {}".format(name)})
# Skip any containers that don't match the container prefix, to avoid wasting time on the wrong containers
# Skip any containers that don't match the container prefix, avoid wasting time on the wrong containers
if name.startswith(prefix):
logger.debug({"message": "Matches prefix {}".format(prefix)})
# services from traefik has a suffix which needs to be stripped to match
Expand All @@ -466,17 +466,20 @@ def get_active_traefik_svcs(narr_activity) -> Dict[str, time.time]:
if (containers[name] > 0) or (name not in narr_activity):
narr_activity[name] = time.time()
logger.debug({"message": "Updated timestamp for {}".format(name)})
# Delete this entry from dictionary so that we can identify any 'leftovers' not in traefik metrics
# Delete this entry from dict to identify any 'leftovers' not in traefik metrics
try:
narr_containers.remove(service_name)
except ValueError:
logger.errors({"message": "Error deleting {} from narr_containers".format(service_name)})
else:
logger.debug({"message": "Skipping because {} did not match running services".format(service_name)})
msg = "Skipping because {} did not match running services".format(service_name)
logger.debug({"message": msg})
else:
logger.debug({"message": "Skipped {} because it didn't match prefix {}".format(name, prefix)})
if len(narr_containers) > 0:
logger.debug({"message": ": {} rancher services matched prefix and image, but not in traefik metrics.".format(len(narr_containers))})
msg = ": {} rancher services matched prefix and image, but not in traefik metrics."
msg = msg.format(len(narr_containers))
logger.debug({"message": msg})
for name in narr_containers:
full_name = name+suffix
if full_name not in narr_activity:
Expand Down Expand Up @@ -541,9 +544,9 @@ def reap_older_prespawn(version: str) -> None:

def reaper() -> int:
"""
Reaper function, originally intended to be called at regular intervals specified by cfg['reaper_sleep_secs']. Now being
called by /reaper/ endpoint, returning number of narratives reaped
Updates last seen timestamps for narratives, reaps any that have been idle for longer than cfg['reaper_timeout_secs']
Called by /reaper/ endpoint, returning number of narratives reaped
Updates last seen timestamps for narratives, reaps any that have been idle for
longer than cfg['reaper_timeout_secs']
"""
global narr_last_version

Expand Down Expand Up @@ -578,7 +581,8 @@ def reaper() -> int:
del narr_activity[name]
reaped += 1
except Exception as e:
logger.critical({"message": "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e))})
msg = "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e))
logger.critical({"message": msg})

# Save narr_activity back to the database
try:
Expand Down Expand Up @@ -672,7 +676,8 @@ def narrative_shutdown(username=None):
break
resp = flask.Response("Service {} deleted".format(name), 200)
except Exception as e:
logger.critical({"message": "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e))})
msg = "Error: Unhandled exception while trying to reap container {}: {}".format(name, repr(e))
logger.critical({"message": msg})
resp = flask.Response("Error deleteing service {}: {}".format(name, repr(e)), 200)
else:
resp = flask.Response('Valid kbase authentication token required', 401)
Expand All @@ -690,7 +695,8 @@ def narrative_services() -> List[dict]:
try:
narr_activity = get_narr_activity_from_db()
except Exception as e:
logger.critical({"message": "Could not get data from database for narrative_status, faking last_seen: {}".format(repr(e))})
msg = "Could not get data from database for narrative_status, faking last_seen: {}".format(repr(e))
logger.critical({"message": msg})
narr_activity = None

try:
Expand Down Expand Up @@ -723,7 +729,8 @@ def narrative_services() -> List[dict]:
info['last_ip'] = match.group(1)
info['created'] = match.group(2)
except Exception as ex:
logger.critical({"message": "Error: Unhandled exception while trying to query service {}: {}".format(name, repr(ex))})
msg = "Error: Unhandled exception while trying to query service {}: {}".format(name, repr(ex))
logger.critical({"message": msg})
info['session_key'] = "Error querying api"
info['image'] = None
info['publicEndpoints'] = None
Expand Down Expand Up @@ -817,5 +824,6 @@ def hello(narrative):
if cfg['mode'] is not None:
app.run()
else:
logger.critical({"message": "No container management configuration. Please set docker_url or rancher_* environment variable appropriately"})
msg = "No container management config. Please set docker_url or rancher_* environment variable appropriately"
logger.critical({"message": msg})
raise RuntimeError("Cannot start/check containers.")
Loading