-
-
Notifications
You must be signed in to change notification settings - Fork 48
[GSoC'25]: Replace bottleneck multiprocessing with async for device polling & fix failing tests in CI #334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
aashimawadhwa
merged 50 commits into
PalisadoesFoundation:develop-abhishek
from
abbi4code:async_device_poll
Sep 26, 2025
Merged
Changes from 31 commits
Commits
Show all changes
50 commits
Select commit
Hold shift + click to select a range
728ec3c
feat: migration to async_snmp_manager initialize
abbi4code 91389bd
feat: configure test script for local debugging
abbi4code 1742c20
feat: added SNMP value conversion and result formatting helpers
abbi4code 7b4376e
chore: fixed few minor bugs && added get hostname method
abbi4code 7a18393
feat: added partial test for manager & added walk method for oid prefix
abbi4code 7a148cd
feat: initialize polling method for misc & system lv data
abbi4code b6d5329
feat: added poll instance for each device & added methods to poll sys…
abbi4code 9b2bdab
Chore: added more validation on output result & fix walkv2 stuck whil…
abbi4code 54e2319
feat: polled layer lvl data from devices
abbi4code 42b6fcb
Fix minor bugs
abbi4code 17ef72d
feat: migrated all layer1 mibs to async, with concurrent polling
abbi4code 40ce2cc
reformatted files with black
abbi4code c32c53b
chore: concurrent poll for mib_entity oids
abbi4code 765a666
feat: layer2 & layer3 async migration completed
abbi4code 64284e8
chore: replace current multiprocessing-based device polling
abbi4code 810260e
lint fixed
abbi4code 85ed8b7
chore: clean up, add logs & make polling concurrent on layer lv
abbi4code 8921862
chore: lint resolved & add test script for polling
abbi4code a033279
chore: fixed minor bugs
abbi4code 689ac98
chore: fix minor bugs && better err handling
abbi4code 97058e6
review changes
abbi4code 34e8510
chore: integrate poller daemon to use our async poller
abbi4code 97514a2
chore: enhanced poller to reduce the load on device
abbi4code 3ce8dfe
fix formatting
abbi4code 213ed90
docstring fixed, add missing args
abbi4code 3d48d06
chore: sync files cleanup
abbi4code eff3f8e
chore: made all tha changes
abbi4code cb60d2c
feat: replaced sequential server posting to async
abbi4code fbfd33c
lint & format fix
abbi4code d8dbc7d
docstring ci fix
abbi4code 0d31de7
chore: cleanup & refactor logs
abbi4code 04fd3c5
chore: fixed all issues happened during rebase
abbi4code cb2a304
removed duplicated part cause during rebase
abbi4code d37ec3d
add decoder in ingester for parsing double encoded mac data
abbi4code 1c4ba55
linted
abbi4code ccde00f
fixed docstring
abbi4code 9a685ce
fix black lint
abbi4code 51cfca6
fixed oui empty mac ingestion bug
abbi4code c6acb86
fix: handled the loophole for diff error
abbi4code ae8ae15
minor fixes
abbi4code aec55c7
fixed all duplicate codes & added support for new parameter in the up…
abbi4code 1dc7092
fixed ci
abbi4code d19331e
fix all the failing tests & renamed poller files
abbi4code 22dc83e
minor fix
abbi4code 628004d
fix docstring violations
abbi4code facd32e
transformed all sync failing test compatible with our async poller
abbi4code 6af38b6
minor fixes
abbi4code ac396c5
removed unused vars
abbi4code 16f4ff9
fixes bugs
abbi4code 2cb1b05
lint fixed
abbi4code File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,292 @@ | ||
"""Async Switchmap-NG poll module.""" | ||
|
||
import asyncio | ||
from collections import namedtuple | ||
from pprint import pprint | ||
import os | ||
import time | ||
import aiohttp | ||
|
||
# Import app libraries | ||
from switchmap import API_POLLER_POST_URI,API_PREFIX | ||
from switchmap.poller.snmp import async_poller | ||
from switchmap.poller.update import device as udevice | ||
from switchmap.poller.configuration import ConfigPoller | ||
from switchmap.core import log, rest, files | ||
from switchmap import AGENT_POLLER | ||
|
||
_META = namedtuple("_META", "zone hostname config") | ||
|
||
|
||
async def devices(max_concurrent_devices=None): | ||
"""Poll all devices asynchronously. | ||
|
||
Args: | ||
max_concurrent_devices: Maximum number of devices to poll concurrently. | ||
If None, uses config.agent_subprocesses() | ||
|
||
Returns: | ||
None | ||
""" | ||
# Initialize key variables | ||
arguments = [] | ||
|
||
# Get configuration | ||
config = ConfigPoller() | ||
|
||
# Use config value if not provided | ||
if not isinstance(max_concurrent_devices,int) or max_concurrent_devices < 1: | ||
log.log2warning(1401, f"Invalid concurrency={max_concurrent_devices}; defaulting to 1") | ||
max_concurrent_devices = 1 | ||
|
||
# Create a list of polling objects | ||
zones = sorted(config.zones(), key=lambda z: z.name) | ||
|
||
for zone in zones: | ||
if not zone.hostnames: | ||
continue | ||
arguments.extend( | ||
_META(zone=zone.name, hostname=_, config=config) | ||
for _ in zone.hostnames | ||
) | ||
|
||
if not arguments: | ||
log_message = "No devices found in configuration" | ||
log.log2info(1400, log_message) | ||
return | ||
|
||
log_message = ( | ||
f"Starting async polling of {len(arguments)} devices " | ||
f"with max concurrency: {max_concurrent_devices}" | ||
) | ||
log.log2info(1401, log_message) | ||
|
||
# Semaphore to limit concurrent devices | ||
device_semaphore = asyncio.Semaphore(max_concurrent_devices) | ||
|
||
timeout = aiohttp.ClientTimeout(total=30) | ||
async with aiohttp.ClientSession(timeout=timeout) as session: | ||
tasks = [ | ||
device(argument, device_semaphore, session, post=True) | ||
for argument in arguments | ||
] | ||
# Execute all devices concurrently | ||
start_time = time.time() | ||
results = await asyncio.gather(*tasks, return_exceptions=True) | ||
end_time = time.time() | ||
|
||
# Process results and log summary | ||
success_count = sum(1 for r in results if r is True) | ||
error_count = sum(1 for r in results if isinstance(r, Exception)) | ||
failed_count = len(results) - success_count - error_count | ||
|
||
log_message = ( | ||
f"Polling completed in {end_time - start_time:.2f}s: " | ||
f"{success_count} succeeded, {failed_count} failed, " | ||
f"{error_count} errors" | ||
) | ||
log.log2info(1402, log_message) | ||
# Log specific errors | ||
for i, result in enumerate(results): | ||
if isinstance(result, Exception): | ||
hostname = arguments[i].hostname | ||
log_message = f"Device {hostname} polling error: {result}" | ||
log.log2warning(1403, log_message) | ||
|
||
|
||
async def device(poll_meta, device_semaphore, session, post=True): | ||
"""Poll each device asynchronously. | ||
|
||
Args: | ||
poll_meta: _META object containing zone, hostname, config | ||
device_semaphore: Semaphore to limit concurrent devices | ||
session: aiohttp ClientSession for HTTP requests | ||
post: Post the data if True, else just print it | ||
|
||
Returns: | ||
bool: True if successful, False otherwise | ||
""" | ||
async with device_semaphore: | ||
# Initialize key variables | ||
hostname = poll_meta.hostname | ||
zone = poll_meta.zone | ||
config = poll_meta.config | ||
|
||
# Do nothing if the skip file exists | ||
skip_file = files.skip_file(AGENT_POLLER, config) | ||
if os.path.isfile(skip_file): | ||
log_message = ( | ||
f"Skip file {skip_file} found. Aborting poll for " | ||
f"{hostname} in zone '{zone}'" | ||
) | ||
log.log2debug(1404, log_message) | ||
return False | ||
|
||
# Poll data for obviously valid hostname | ||
if ( | ||
not hostname | ||
or not isinstance(hostname, str) | ||
or hostname.lower() == "none" | ||
): | ||
log_message = f"Invalid hostname: {hostname}" | ||
log.log2debug(1405, log_message) | ||
return False | ||
|
||
try: | ||
poll = async_poller.Poll(hostname) | ||
|
||
# Initialize SNMP connection | ||
if not await poll.initialize_snmp(): | ||
log_message = f"Failed to initialize SNMP for {hostname}" | ||
log.log2debug(1406, log_message) | ||
return False | ||
|
||
# Query device data asynchronously | ||
snmp_data = await poll.query() | ||
|
||
# Process if we get valid data | ||
if bool(snmp_data) and isinstance(snmp_data, dict): | ||
# Process device data | ||
_device = udevice.Device(snmp_data) | ||
data = _device.process() | ||
data["misc"]["zone"] = zone | ||
|
||
if post: | ||
try: | ||
# Construct full URL for posting | ||
url = f"{config.server_url_root()}{API_PREFIX}{API_POLLER_POST_URI}" | ||
log_message = f"Posting data for {hostname} to {url}" | ||
log.log2debug(1416, log_message) | ||
|
||
async with session.post( | ||
url, json=data | ||
) as res: | ||
if res.status == 200: | ||
log_message = ( | ||
f"Successfully polled and posted data " | ||
f"for {hostname}" | ||
) | ||
log.log2debug(1407, log_message) | ||
else: | ||
log_message = ( | ||
f"Failed to post data for {hostname}, " | ||
f"status={res.status}" | ||
) | ||
log.log2warning(1414, log_message) | ||
except aiohttp.ClientError as e: | ||
log_message = ( | ||
f"HTTP error posting data for {hostname}: {e}" | ||
) | ||
log.log2warning(1415, log_message) | ||
return False | ||
|
||
else: | ||
pprint(data) | ||
|
||
return True | ||
else: | ||
log_message = ( | ||
f"Device {hostname} returns no data. Check " | ||
f"connectivity/SNMP configuration" | ||
) | ||
log.log2debug(1408, log_message) | ||
return False | ||
|
||
except (asyncio.TimeoutError, KeyError, ValueError) as e: | ||
log_message = f"Recoverable error polling device {hostname}: {e}" | ||
log.log2warning(1409, log_message) | ||
return False | ||
except Exception as e: | ||
log_message = f"Unexpected error polling device {hostname}: {e}" | ||
log.log2warning(1409, log_message) | ||
return False | ||
abbi4code marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
|
||
async def cli_device(hostname): | ||
"""Poll single device for data - CLI interface. | ||
|
||
Args: | ||
hostname: Host to poll | ||
|
||
Returns: | ||
None | ||
""" | ||
# Initialize key variables | ||
arguments = [] | ||
|
||
# Get configuration | ||
config = ConfigPoller() | ||
|
||
# Create a list of polling objects | ||
zones = sorted(config.zones()) | ||
|
||
abbi4code marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
# Create a list of arguments | ||
for zone in zones: | ||
if not zone.hostnames: | ||
continue | ||
for next_hostname in zone.hostnames: | ||
if next_hostname == hostname: | ||
arguments.append( | ||
_META(zone=zone.name, hostname=hostname, config=config) | ||
) | ||
|
||
if arguments: | ||
log_message = ( | ||
f"Found {hostname} in {len(arguments)} zone(s), starting async poll" | ||
) | ||
log.log2info(1410, log_message) | ||
|
||
# Poll each zone occurrence | ||
semaphore = asyncio.Semaphore(1) | ||
async with aiohttp.ClientSession() as session: | ||
tasks = [ | ||
device(argument, semaphore, session, post=False) | ||
for argument in arguments | ||
] | ||
results = await asyncio.gather(*tasks, return_exceptions=True) | ||
|
||
# Check results | ||
success_count = sum(1 for r in results if r is True) | ||
if success_count > 0: | ||
log_message = ( | ||
f"Successfully polled {hostname} from " | ||
f"{success_count}/{len(results)} zone(s)" | ||
) | ||
log.log2info(1411, log_message) | ||
else: | ||
log_message = f"Failed to poll {hostname} from any configured zone" | ||
log.log2warning(1412, log_message) | ||
|
||
else: | ||
log_message = f"No hostname {hostname} found in configuration" | ||
log.log2see(1413, log_message) | ||
|
||
|
||
def run_devices(max_concurrent_devices=None): | ||
"""Run device polling - main entry point. | ||
|
||
Args: | ||
max_concurrent_devices (int, optional): Maximum number of devices to | ||
poll concurrently. If None, uses config.agent_subprocesses(). | ||
|
||
Returns: | ||
None | ||
""" | ||
# Use config if not specified | ||
if max_concurrent_devices is None: | ||
config = ConfigPoller() | ||
max_concurrent_devices = config.agent_subprocesses() | ||
|
||
asyncio.run(devices(max_concurrent_devices)) | ||
|
||
|
||
def run_cli_device(hostname): | ||
"""Run CLI device polling - main entry point. | ||
|
||
Args: | ||
hostname (str): The hostname of the device to poll. | ||
|
||
Returns: | ||
None | ||
""" | ||
asyncio.run(cli_device(hostname)) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.