Skip to content

Commit 175ae87

Browse files
committed
fix ingester issue
1 parent 5c0fff9 commit 175ae87

File tree

5 files changed

+89
-41
lines changed

5 files changed

+89
-41
lines changed

switchmap/poller/snmp/async_snmp_info.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Async module to aggregate query results."""
22

33
import time
4-
import sys
54
from collections import defaultdict
65
from switchmap.core import log
76
import asyncio
@@ -57,10 +56,10 @@ async def everything(self):
5756
keys = ["misc", "system", "layer1", "layer2", "layer3"]
5857
for key, result in zip(keys, results):
5958
if isinstance(result, Exception):
60-
log.log2warning(f"{key} failed: {result}")
59+
log.log2warning(1004, f"{key} failed: {result}")
6160
elif result:
6261
data[key] = result
63-
print(f"final data: {data}")
62+
6463
# Return
6564
return data
6665

switchmap/poller/snmp/mib/generic/mib_if.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from switchmap.poller.snmp.base_query import Query
66
from switchmap.core import general, log
77
import asyncio
8+
from switchmap.core import general, log
9+
import asyncio
810

911

1012
def get_query():

switchmap/server/db/ingest/ingest.py

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -78,48 +78,39 @@ def process(self):
7878
with tempfile.TemporaryDirectory(
7979
dir=self._config.ingest_directory()
8080
) as tmpdir:
81-
# Only run the ingest if there is no poller lock file
82-
# This helps to prevent ingesting files while polling is
83-
# still running. This is only effective when the poller
84-
# and ingester are running on the same machine
85-
if os.path.isfile(poller_lock_file) is False:
86-
# Copy files from cache to ingest
87-
files.move_yaml_files(cache_directory, tmpdir)
88-
89-
# Parallel process the files
90-
setup_success = setup(tmpdir, self._config)
91-
92-
if bool(setup_success) is True:
93-
# Populate the arguments
94-
arguments = [
95-
[
96-
IngestArgument(
97-
idx_zone=item.idx_zone,
98-
data=item.data,
99-
filepath=item.filepath,
100-
config=item.config,
101-
dns=self._dns,
102-
)
103-
]
104-
for item in setup_success.zones
81+
# ASYNC ARCHITECTURE: Allow ingester to run while poller is active
82+
# Copy files from cache to ingest
83+
files.move_yaml_files(cache_directory, tmpdir)
84+
85+
# Parallel process the files
86+
setup_success = setup(tmpdir, self._config)
87+
88+
if bool(setup_success) is True:
89+
# Populate the arguments
90+
arguments = [
91+
[
92+
IngestArgument(
93+
idx_zone=item.idx_zone,
94+
data=item.data,
95+
filepath=item.filepath,
96+
config=item.config,
97+
dns=self._dns,
98+
)
10599
]
100+
for item in setup_success.zones
101+
]
106102

107-
# Process the device independent zone data in the
108-
# database first
109-
if bool(arguments) is True:
110-
pairmacips = self.zone(arguments)
103+
# Process the device independent zone data in the
104+
# database first
105+
if bool(arguments) is True:
106+
pairmacips = self.zone(arguments)
111107

112-
# Process the device dependent in the database second
113-
if bool(pairmacips):
114-
self.device(arguments)
108+
# Process the device dependent in the database second
109+
if bool(pairmacips):
110+
self.device(arguments)
115111

116-
# Cleanup
117-
self.cleanup(setup_success.event)
118-
else:
119-
log_message = f"""\
120-
Poller lock file {poller_lock_file} exists. Skipping processing of cache \
121-
files. Is the poller running or did it crash unexpectedly?"""
122-
log.log2info(1077, log_message)
112+
# Cleanup
113+
self.cleanup(setup_success.event)
123114

124115
def zone(self, arguments):
125116
"""Ingest the files' zone data.

switchmap/server/db/ingest/update/device.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,34 @@
3434
)
3535

3636

37+
def _decode_mac_address(encoded_mac):
38+
"""Decode double-encoded MAC addresses from async poller.
39+
40+
Args:
41+
encoded_mac: MAC address that may be double hex-encoded
42+
43+
Returns:
44+
str: Properly formatted MAC address or original if already valid
45+
46+
"""
47+
import binascii
48+
49+
try:
50+
# Try to decode hex-encoded string to ASCII
51+
if isinstance(encoded_mac, str) and len(encoded_mac) > 12:
52+
decoded = binascii.unhexlify(encoded_mac).decode('ascii')
53+
# Check if it starts with '0x' (hex prefix)
54+
if decoded.startswith('0x'):
55+
return decoded[2:] # Return MAC without '0x' prefix
56+
57+
# If decoding fails or doesn't match pattern, return original
58+
return encoded_mac
59+
60+
except Exception:
61+
# If any decoding fails, return original
62+
return encoded_mac
63+
64+
3765
def process(data, idx_zone, dns=True):
3866
"""Process data received from a device.
3967

switchmap/server/db/ingest/update/zone.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,34 @@
1717
)
1818

1919

20+
def _decode_mac_address(encoded_mac):
21+
"""Decode double-encoded MAC addresses from async poller.
22+
23+
Args:
24+
encoded_mac: MAC address that may be double hex-encoded
25+
26+
Returns:
27+
str: Properly formatted MAC address or original if already valid
28+
29+
"""
30+
import binascii
31+
32+
try:
33+
# Try to decode hex-encoded string to ASCII
34+
if isinstance(encoded_mac, str) and len(encoded_mac) > 12:
35+
decoded = binascii.unhexlify(encoded_mac).decode('ascii')
36+
# Check if it starts with '0x' (hex prefix)
37+
if decoded.startswith('0x'):
38+
return decoded[2:] # Return MAC without '0x' prefix
39+
40+
# If decoding fails or doesn't match pattern, return original
41+
return encoded_mac
42+
43+
except Exception:
44+
# If any decoding fails, return original
45+
return encoded_mac
46+
47+
2048
def process(data, idx_zone, dns=True):
2149
"""Process data received from a device.
2250

0 commit comments

Comments
 (0)