Skip to content
This repository has been archived by the owner on Mar 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #41 from agccie/2.0
Browse files Browse the repository at this point in the history
2.0
  • Loading branch information
agccie authored Mar 21, 2019
2 parents e9f8805 + 1ee8484 commit 8b2ec19
Show file tree
Hide file tree
Showing 51 changed files with 1,551 additions and 646 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ cisco_aci_app_tools-1.2_min.tar.gz
*.aci
*.mp4
.tmpDocker
.dockerignore

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
Empty file modified Service/app.wsgi
100644 → 100755
Empty file.
14 changes: 10 additions & 4 deletions Service/app/models/aci/ept/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
HELLO_INTERVAL = 5.0
HELLO_TIMEOUT = 60.0
WATCH_INTERVAL = 1.0
NOTIFY_INTERVAL = 1.0
NOTIFY_QUEUE_MAX_SIZE = 4096
CACHE_STATS_INTERVAL = 300.0
SEQUENCE_TIMEOUT = 100.0
MANAGER_CTRL_CHANNEL = "mctrl"
Expand All @@ -38,13 +40,13 @@
WORKER_UPDATE_INTERVAL = 15.0
RAPID_CALCULATE_INTERVAL = 15.0
MAX_SEND_MSG_LENGTH = 10240
EPM_EVENT_HANDLER_INTERVAL = 0.01
EPM_EVENT_HANDLER_ENABLED = True
BG_EVENT_HANDLER_INTERVAL = 0.01
BG_EVENT_HANDLER_ENABLED = True

# when API requests msg queue length, manager can read the full data off each queue and accurate
# msgs within bulk messages for accurate count. There is a performance hit to this so the
# alternative is counting the number of messages in each queue where a bulk message counts as one.
ACCURATE_QUEUE_LENGTH = True
ACCURATE_QUEUE_LENGTH = False

# transitory timers:
# max_epm_build maximum amount of time to wait for ACK from all worker processes to indiciate
Expand Down Expand Up @@ -544,7 +546,11 @@ def run(self):
logger.debug("starting background thread: %s", self.name)
while not self._exit:
self.count+=1
self.func(*self.args, **self.kwargs)
try:
self.func(*self.args, **self.kwargs)
except Exception as e:
logger.debug("Traceback:\n%s", traceback.format_exc())
logger.error("failed to execute background process: %s", e)
if self.max_count > 0 and self.count >= self.max_count:
# stop execution when reaching max number of iterations
return
Expand Down
95 changes: 95 additions & 0 deletions Service/app/models/aci/ept/dns_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@

from . ept_cache import hitCache
from . ept_cache import hitCacheNotFound

import dns.resolver
import re
import logging
import time
import traceback

# module level logging
logger = logging.getLogger(__name__)

class DNSEntry(object):
def __init__(self, name, query_type, answer=None, ttl=None):
self.name = name
self.query_type = query_type
self.answer = answer
self.ttl = ttl

def __repr__(self):
delta = self.ttl - time.time()
return "%s::%s [%s] [ttl: %0.3f, delta: %0.3f]" % (self.query_type, self.name, self.answer,
self.ttl, delta)

class DNSCache(object):
""" In-memory DNS Cache object that tracks record type and record result """
def __init__(self, resolver=None, max_records=10, timeout=10):
# resolver can be list of IP addresses for DNS lookup or single string
self.cache = hitCache(max_records)
self.resolver = dns.resolver.Resolver()
if resolver is not None:
if isinstance(resolver, basestring):
resolver = [resolver]
self.resolver.nameservers = resolver
self.resolver.timeout = timeout

def dns_lookup(self, name, query_type="A"):
""" perform dns lookup for A record or MX record for provided name. On success return string
DNS result. For MX lookup return only result with lowest preference. Return None if
unable to resolve DNS
"""
ts = time.time()
keystr = "%s::%s" % (query_type, name)
dns_record = self.cache.search(keystr)
if not isinstance(dns_record, hitCacheNotFound):
logger.debug("(from cache) %s", dns_record)
if ts is None or ts <= dns_record.ttl:
return dns_record.answer
else:
logger.debug("ttl expired, removing from cache")
self.cache.remove(keystr)

# create a dummy dns_entry that we will cache on success of failure
dns_entry = DNSEntry(name, query_type, ttl=ts+60)
logger.debug("performing DNS lookup for %s", keystr)
if query_type == "A":
# if an IP address was provided then we can cache the address and the result directly
if re.search("^[0-9\.]+$", name):
logger.debug("skipping DNS lookup for IPv4 address %s", name)
dns_entry.answer = name
dns_entry.ttl = ts + 31536000 # 1 year cache
else:
try:
record = self.resolver.query(name, "A")
if len(record) > 0:
dns_entry.answer = record[0].address
dns_entry.ttl = record.expiration
except Exception as e:
logger.warn("failed to resolve A record for '%s': %s", name, e)
logger.debug(traceback.format_exc())
elif query_type == "MX":
try:
preferred = None
record = self.resolver.query(name, "MX")
for subrecord in record:
if preferred is None or subrecord.preference < preferred.preference:
preferred = subrecord
if preferred is None:
logger.warn("no valid MX record found")
else:
dns_entry.answer = preferred.exchange.to_text()
dns_entry.ttl = record.expiration
except Exception as e:
logger.warn("failed to resolve MX record for domain '%s': %s", name, e)
logger.debug(traceback.format_exc())
else:
logger.error("unsupported query type '%s'", query_type)
return None

# add result to cache and return dns_entry.address even if it is None
logger.debug("DNS result for %s: %s", keystr, dns_entry.answer)
self.cache.push(keystr, dns_entry)
return dns_entry.answer

2 changes: 2 additions & 0 deletions Service/app/models/aci/ept/ept_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ def per_node_clear_endpoint(switch):
threads.append(t)
for t in threads: t.join()
for node in process:
# cleanup worker fabric
process[node]["worker_fabric"].close()
if not process[node]["ret"]:
error_rows.append("failed to clear endpoint on node %s" % node)
return jsonify({
Expand Down
46 changes: 27 additions & 19 deletions Service/app/models/aci/ept/ept_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class eptManager(object):
REQUIRED_ROLES = ["worker", "watcher"]

def __init__(self, worker_id):
threading.currentThread().name = "mgr-main"
logger.debug("init role manager id %s", worker_id)
register_signal_handlers()
self.worker_id = "%s" % worker_id
Expand Down Expand Up @@ -110,7 +111,7 @@ def _run(self):
self.redis.flushall()
wait_for_db(self.db)
self.worker_tracker = WorkerTracker(manager=self)
self.stats_thread = BackgroundThread(func=self.update_stats, name="stats", count=0,
self.stats_thread = BackgroundThread(func=self.update_stats, name="mgr-stats", count=0,
interval= eptQueueStats.STATS_INTERVAL)
self.stats_thread.daemon = True
self.stats_thread.start()
Expand All @@ -122,6 +123,7 @@ def _run(self):
p = self.redis.pubsub(ignore_subscribe_messages=True)
p.subscribe(**channels)
self.subscribe_thread = p.run_in_thread(sleep_time=0.01, daemon=True)
self.subscribe_thread.name = "mgr-redis"
logger.debug("[%s] listening for events on channels: %s", self, channels.keys())

# wait for minimum workers to be ready
Expand Down Expand Up @@ -154,13 +156,13 @@ def _run(self):
if q == MANAGER_WORK_QUEUE and msg.msg_type == MSG_TYPE.WORK:
self.increment_stats(MANAGER_WORK_QUEUE, tx=False)
if msg.addr == 0:
# an addr of 0 is a broadcast to all workers of specified role
# send now (note, broadcast may be out of order if received in BULK
# with unicast updates
# an addr of 0 is a broadcast to all workers of specified role.
# Send broadcast now, not within a batch.
# Note, broadcast may be out of order if received in BULK with ucast msg
self.worker_tracker.broadcast(msg, qnum=msg.qnum, role=msg.role)
else:
# create hash based on address and send to specific worker
# need to ensure that we has on ip for EPM_RS_IP_EVENT so it goes to the
# need to ensure that we hash on ip for EPM_RS_IP_EVENT so it goes to the
# correct worker...
if msg.wt == WORK_TYPE.EPM_RS_IP_EVENT:
_hash = sum(ord(i) for i in msg.ip)
Expand Down Expand Up @@ -201,7 +203,7 @@ def handle_manager_ctrl(self, msg):
# note if brief is set to False, then this request can take significant time to read
# and analyze all requests queued.
kwargs = {"seq": msg.seq, "brief": msg.data.get("brief", True)}
tmp = threading.Thread(target=self.publish_manager_status, name="status", kwargs=kwargs)
tmp = threading.Thread(target=self.publish_manager_status,name="mgr-status",kwargs=kwargs)
tmp.daemon = True
tmp.start()

Expand Down Expand Up @@ -254,6 +256,7 @@ def publish_manager_status(self, seq=0, brief=False):
start_ts = time.time()
data = {
"manager": {
"status": "running" if self.minimum_workers_ready() else "starting",
"manager_id": self.worker_id,
"queues": [MANAGER_WORK_QUEUE],
"queue_len":[get_queue_length(self.redis,MANAGER_WORK_QUEUE,accurate=not brief)],
Expand Down Expand Up @@ -411,7 +414,8 @@ def stop_fabric(self, fabric, reason=None):
# having worker perform the operation which could lead to race conditions. This needs
# to happening in a different thread so it does not block hellos or other events
# received on the control thread.
tmp = threading.Thread(target=self.worker_tracker.flush_fabric, args=(fabric,))
tmp = threading.Thread(name="mgr-tmp", target=self.worker_tracker.flush_fabric,
args=(fabric,))
tmp.daemon = True
tmp.start()
return True
Expand Down Expand Up @@ -476,7 +480,7 @@ def __init__(self, manager=None):
self.active_workers = {} # list of active workers indexed by role
self.update_interval = WORKER_UPDATE_INTERVAL # interval to check for new/expired workers
self.update_thread = BackgroundThread(func=self.update_active_workers, count=0,
name="workerTracker", interval=self.update_interval)
name="mgr-tracker", interval=self.update_interval)
self.update_thread.daemon = True
self.update_thread.start()

Expand Down Expand Up @@ -530,17 +534,18 @@ def update_active_workers(self):
# running.
ts = time.time()
remove_workers = []
new_workers = False
new_workers = []
for wid, w in self.known_workers.items():
if w.last_hello + HELLO_TIMEOUT < ts:
logger.warn("worker timeout (last hello: %.3f) %s",ts-w.last_hello, w)
remove_workers.append(w)
elif not w.active:
# new worker that needs to be added to active list
if w.role not in self.active_workers: self.active_workers[w.role] = []
if w.role not in self.active_workers:
self.active_workers[w.role] = []
self.active_workers[w.role].append(w)
w.active = True
new_workers = True
new_workers.append(w)
# sort active workers by worker_id for deterministic ordering
self.active_workers[w.role] = sorted(self.active_workers[w.role],
key=lambda w: int(re.sub("[^0-9]","",w.worker_id)))
Expand Down Expand Up @@ -575,16 +580,19 @@ def update_active_workers(self):
with w.queue_locks[i]:
self.redis.delete(q)

# if a worker has died, then trigger monitor restart for all fabrics
if len(remove_workers)>0:
inactive = "[%s]" % ", ".join([w.worker_id for w in remove_workers])
# if a worker has died or new worker comes online, then trigger monitor restart
stop_message = None
if len(new_workers)>0:
stop_message = "[%s]" % ", ".join([w.worker_id for w in new_workers])
stop_message = "new worker detected %s" % stop_message
elif len(remove_workers)>0:
stop_message = "[%s]" % ", ".join([w.worker_id for w in remove_workers])
stop_message = "worker heartbeat timeout %s" % stop_message
if stop_message is not None:
logger.info("total workers: %s", len(self.known_workers))
# stop fabric (manager will restart when ready)
for f in self.manager.fabrics.keys():
self.manager.stop_fabric(f, reason="worker heartbeat timeout %s" % inactive)

if len(remove_workers)>0 or new_workers:
logger.info("total workers: %s", len(self.known_workers))

self.manager.stop_fabric(f, reason=stop_message)

# check hello from each subscriber. If any have timedout, set to inactive (manager func
# will restart any subscribers that are inactive)
Expand Down
10 changes: 9 additions & 1 deletion Service/app/models/aci/ept/ept_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class MSG_TYPE(Enum):
@enum_unique
class WORK_TYPE(Enum):
RAW = "raw" # raw/unparsed epm event
STD_MO = "std_mo" # raw/unparsed standard mo event
WATCH_NODE = "watch_node" # a new node has become active/inactive
WATCH_MOVE = "watch_move" # an endpoint move event requires watch or notify
WATCH_STALE = "watch_stale" # a stale endpoint event requires watch or notify
Expand Down Expand Up @@ -260,6 +261,7 @@ def from_msg_json(js):
elif wt==WORK_TYPE.EPM_MAC_EVENT: mod = eptMsgWorkEpmEvent
elif wt==WORK_TYPE.EPM_RS_IP_EVENT: mod = eptMsgWorkEpmEvent
elif wt == WORK_TYPE.RAW: mod = eptMsgWorkRaw
elif wt == WORK_TYPE.STD_MO: mod = eptMsgWorkStdMo
elif wt == WORK_TYPE.WATCH_MOVE: mod = eptMsgWorkWatchMove
elif wt == WORK_TYPE.WATCH_OFFSUBNET: mod = eptMsgWorkWatchOffSubnet
elif wt == WORK_TYPE.WATCH_STALE: mod = eptMsgWorkWatchStale
Expand All @@ -269,11 +271,17 @@ def from_msg_json(js):
return mod(*args, **kwargs)

class eptMsgWorkRaw(eptMsgWork):
""" raw/unparsed epm event """
""" raw/unparsed epm or standard mo event """
def __init__(self, addr, role, data, wt, qnum=1, seq=1, fabric=1):
super(eptMsgWorkRaw, self).__init__(addr, role, data, wt, qnum=qnum, seq=seq, fabric=fabric)
self.wt = WORK_TYPE.RAW

class eptMsgWorkStdMo(eptMsgWork):
""" raw/unparsed epm or standard mo event """
def __init__(self, addr, role, data, wt, qnum=1, seq=1, fabric=1):
super(eptMsgWorkStdMo, self).__init__(addr, role, data, wt, qnum=qnum,seq=seq,fabric=fabric)
self.wt = WORK_TYPE.STD_MO

class eptMsgWorkDeleteEpt(eptMsgWork):
""" fixed message type for DELETE_EPT """
def __init__(self, addr, role, data, wt, qnum=1, seq=1, fabric=1):
Expand Down
12 changes: 6 additions & 6 deletions Service/app/models/aci/ept/ept_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ class eptNode(Rest):
"type":str,
"description": "node name as seen in fabric node vector",
},
"oob_addr": {
"type": str,
"default": "0.0.0.0",
"description": "node out-of-band management address",
},
"state": {
"type": str,
"description": "fabricNode state indicating whether it is in-service or inactive",
},
"role": {
"type": str,
"values": ["controller", "leaf", "spine", "vpc"],
"values": ["unsupported", "controller", "leaf", "spine", "vpc"],
"description": "node role to differentiate between controllers, leafs, and spines",
},
"addr": {
"type": str,
"default": "0.0.0.0",
"description": "32-bit physical TEP ipv4 address of node",
},
"version": {
"type": str,
"default": "",
"description": "currently running PE version",
},
"peer": {
"type": int,
"default": 0,
Expand Down
Loading

0 comments on commit 8b2ec19

Please sign in to comment.