Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various issues with large Snipe-IT (50k+ assets, 50k+ users) and JAMF installations (5k+ assets) #119

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 117 additions & 104 deletions jamf2snipe
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ validsubset = [
"configuration_profiles"
]


# Import all the things
import json
from functools import lru_cache
import requests
import time
import configparser
Expand Down Expand Up @@ -463,80 +462,79 @@ def search_snipe_asset(serial):
logging.debug('{} - {}'.format(response.status_code, response.content))
return "ERROR"

# Function to get all the asset models
def get_snipe_models():
api_url = '{}/api/v1/models'.format(snipe_base)
logging.debug('Calling against: {}'.format(api_url))
response = requests.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler})
if response.status_code == 200:
jsonresponse = response.json()
logging.info("Got a valid response that should have {} models.".format(jsonresponse['total']))
if jsonresponse['total'] <= len(jsonresponse['rows']) :
return jsonresponse
else:
logging.info("We didn't get enough results so we need to get them again.")
api_url = '{}/api/v1/models?limit={}'.format(snipe_base, jsonresponse['total'])
newresponse = requests.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler})
if response.status_code == 200:
newjsonresponse = newresponse.json()
if newjsonresponse['total'] == len(newjsonresponse['rows']) :
return newjsonresponse
else:
logging.error("We couldn't seem to get all of the model numbers")
raise SystemExit("Unable to get all model objects from Snipe-IT instanace")
else:
logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content))
raise SystemExit("Snipe models API endpoint failed.")

# Helper function to clean up the repeated codes
def api_call(endpoint, payload=None, method="GET"):
logging.debug(f"Calling {endpoint} with method {method} and payload {payload}")
api_url = f"{snipe_base}/api/v1/{endpoint}"
if method == "GET":
logging.debug(f"Calling: {api_url}")
response = requests.get(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl,
hooks={'response': request_handler})
elif method == "POST":
response = requests.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl,
hooks={'response': request_handler})
elif method == "PATCH":
response = requests.patch(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl,
hooks={'response': request_handler})
else:
logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content))
raise SystemExit("Snipe models API endpoint failed.")
logging.error(f"Unknown method {method}")
raise SystemExit("Unknown method")

# Recursive function returns all users in a Snipe Instance, 100 at a time.
def get_snipe_users(previous=[]):
user_id_url = '{}/api/v1/users'.format(snipe_base)
payload = {
'limit': 100,
'offset': len(previous)
if response.status_code != 200:
logging.error(f"Snipe-IT responded with error code:{response.text}")
logging.debug(f"{response.status_code} - {response.content}")
raise SystemExit("Snipe-IT API call failed")
logging.debug(f"Got a valid response from Snipe-IT: {response.text}")
return response.json()


# Function to get all the asset models
def get_snipe_models(current_models={}):
limits = {
'limit': 500,
'offset': len(current_models)
}
logging.debug('The payload for the snipe users GET is {}'.format(payload))
response = requests.get(user_id_url, headers=snipeheaders, params=payload, hooks={'response': request_handler})
response_json = response.json()
current = response_json['rows']
if len(previous) != 0:
current = previous + current
if response_json['total'] > len(current):
logging.debug('We have more than 100 users, get the next page - total: {} current: {}'.format(response_json['total'], len(current)))
return get_snipe_users(current)
else:
return current
response = api_call("models", method="GET", payload=limits)

# This happens if there is an error
if "total" not in response:
logging.error("Fetching models failed, enable debug to see response")
raise SystemExit("Necessary Snipe-IT API call failed")

# Quickly end if there are no rows
if "rows" not in response:
return current_models

# Add the models to the dictionary
for row in response['rows']:
if row['model_number']:
current_models[row['model_number']] = row['id']

# If we haven't gotten all the models, get more
if response['total'] > len(current_models):
logging.debug(f'Fetching more models - {len(current_models)}/{response["total"]}')
current_models.update(get_snipe_models(current_models))

return current_models


# Function to search snipe for a user
@lru_cache(maxsize=2048)
def get_snipe_user_id(username):
if username == '':
return "NotFound"
if not username:
return None
username = username.lower()
for user in snipe_users:
for value in user.values():
if str(value).lower() == username:
id = user['id']
return id
if user_args.users_no_search:
logging.debug("No matches in snipe_users for {}, not querying the API for the next closest match since we've been told not to".format(username))
return "NotFound"
logging.debug('No matches in snipe_users for {}, querying the API for the next closest match'.format(username))
user_id_url = '{}/api/v1/users'.format(snipe_base)
payload = {
'search':username,
'limit':1,
'sort':'username',
'order':'asc'
}
logging.debug('The payload for the snipe user search is: {}'.format(payload))
response = requests.get(user_id_url, headers=snipeheaders, params=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler})
try:
return response.json()['rows'][0]['id']
except:
return "NotFound"
response = api_call("users", method="GET", payload={'username': username})
if 'total' not in response or response['total'] == 0:
return None

if response['total'] > 1:
logging.warning(f"Found {response['total']} users with username {username}, returning none")
return None

return response['rows'][0]['id']


# Function that creates a new Snipe Model - not an asset - with a JSON payload
def create_snipe_model(payload):
Expand Down Expand Up @@ -617,10 +615,10 @@ def checkin_snipe_asset(asset_id):
def checkout_snipe_asset(user, asset_id, checked_out_user=None):
logging.debug('Asset {} is being checked out to {}'.format(user, asset_id))
user_id = get_snipe_user_id(user)
if user_id == 'NotFound':
if not user_id:
logging.info("User {} not found".format(user))
return "NotFound"
if checked_out_user == None:
if checked_out_user is None:
logging.info("Not checked out, checking out to {}".format(user))
elif checked_out_user == "NewAsset":
logging.info("First time this asset will be checked out, checking out to {}".format(user))
Expand All @@ -647,6 +645,32 @@ def checkout_snipe_asset(user, asset_id, checked_out_user=None):
logging.error('Asset checkout failed for asset {} with error {}'.format(asset_id,response.text))
return response


# Function to recursively get keys from a dictionary
def get_config_value(config_key, data):
search_keys = config_key.split(" ")

value = data
for key in search_keys:
try:
key = int(key)
except ValueError:
logging.debug(f"{key} is not an integer")
try:
value = value[key]
except (KeyError, IndexError):
logging.info(f"{key} does not exist")
logging.debug(f"Ansible value: {value}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ansible? assuming this line was copied from elsewhere? ;)

value = None
break
except TypeError:
logging.error(f"Type error when fetching data for {key}, check your config")
raise SystemExit

logging.debug(f"Got value {value} for {config_key}")
return value


### Run Testing ###
# Report if we're verifying SSL or not.
logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl))
Expand Down Expand Up @@ -674,7 +698,7 @@ else:
logging.info('We were able to get a good response from your JAMFPro instance.')

# Exit if you can't contact SNIPE
if ( JAMF_UP == False ) or ( SNIPE_UP == False ):
if not JAMF_UP or not SNIPE_UP:
raise SystemExit("Error: Host could not be contacted.")

# Test that we can actually connect with the API keys by getting a bearer token.
Expand All @@ -686,14 +710,7 @@ logging.info("Finished running our tests.")
### Get Started ###
# Get a list of known models from Snipe
logging.info("Getting a list of computer models that snipe knows about.")
snipemodels = get_snipe_models()
logging.debug("Parsing the {} model results for models with model numbers.".format(len(snipemodels['rows'])))
modelnumbers = {}
for model in snipemodels['rows']:
if model['model_number'] == "":
logging.debug("The model, {}, did not have a model number. Skipping.".format(model['name']))
continue
modelnumbers[model['model_number']] = model['id']
modelnumbers = get_snipe_models()
logging.info("Our list of models has {} entries.".format(len(modelnumbers)))
logging.debug("Here's the list of the {} models and their id's that we were able to collect:\n{}".format(len(modelnumbers), modelnumbers))

Expand All @@ -714,12 +731,6 @@ jamf_types = {
'mobile_devices': jamf_mobile_list
}

# Get a list of users from Snipe if the user has specified
# they're syncing users

if user_args.users or user_args.users_force or user_args.users_inverse:
snipe_users = get_snipe_users()

TotalNumber = 0
if user_args.computers:
TotalNumber = len(jamf_types['computers']['computers'])
Expand All @@ -730,12 +741,13 @@ else:
TotalNumber += len(jamf_types[jamf_type][jamf_type])

# Make sure we have a good list.
if jamf_computer_list != None:
logging.info('Received a list of JAMF assets that had {} entries.'.format(TotalNumber))
else:
if jamf_computer_list is None:
logging.error("We were not able to retreive a list of assets from your JAMF instance. It's likely that your settings, or credentials are incorrect. Check your settings.conf and verify you can make API calls outside of this system with the credentials found in your settings.conf")
raise SystemExit("Unable to get JAMF Computers.")

logging.info('Received a list of JAMF assets that had {} entries.'.format(TotalNumber))


# After this point we start editing data, so quit if this is a dryrun
if user_args.dryrun:
raise SystemExit("Dryrun: Complete.")
Expand All @@ -759,15 +771,16 @@ for jamf_type in jamf_types:
jamf = search_jamf_asset(jamf_asset['id'])
elif jamf_type == 'mobile_devices':
jamf = search_jamf_mobile(jamf_asset['id'])
if jamf == None:
if not jamf:
continue

# If the entry doesn't contain a serial, then we need to skip this entry.
if jamf['general']['serial_number'] == 'Not Available':
if not jamf['general']['serial_number']:
Comment on lines -766 to +778
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc This evaluates a literal string for "Not Available" that sometimes occurs for devices that are in Jamf via MDM but haven't checked in yet. Jamf didn't leave the it as null, but returned a valid string instead. That might have changed, but this would probably cause a regression as is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my current JAMF instance, a system that is missing info may return null or an empty string ('') which would fail this check - however that fact doesn't seem to be consistent, sometimes it is null, sometimes it is an empty string, hence why I made the modification. I have not ran across the "Not Available" string, this may be an artifact from the old API?

Copy link
Collaborator

@ParadoxGuitarist ParadoxGuitarist Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also be a school.apple.com thing. I'd recommend altering the line to check for both so if it's either that literal string or null we're covered.

logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.")
continue
if jamf['general']['serial_number'] == None:
logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.")
if jamf['general']['serial_number'] == 'Not Available':
logging.warning(
"The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.")
continue

# Check that the model number exists in snipe, if not create it.
Expand Down Expand Up @@ -813,7 +826,7 @@ for jamf_type in jamf_types:
logging.verbose(jamf)
continue
#raise SystemError('No such attribute {} in the jamf payload. Please check your settings.conf file'.format(tag_split))
if jamf_asset_tag == None or jamf_asset_tag == '':
if not jamf_asset_tag:
logging.debug('No custom configuration found in settings.conf for asset tag name upon asset creation.')
if jamf_type == 'mobile_devices':
jamf_asset_tag = 'jamfid-m-{}'.format(jamf['general']['id'])
Expand Down Expand Up @@ -858,12 +871,12 @@ for jamf_type in jamf_types:
if new_snipe_asset[0] != "AssetCreated":
continue
if user_args.users or user_args.users_force or user_args.users_inverse:
jamfsplit = config['user-mapping']['jamf_api_field'].split()
if jamfsplit[1] not in jamf[jamfsplit[0]]:
logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0]))
user = get_config_value(config['user-mapping']['jamf_api_field'], jamf)
if not user:
logging.info("User not found in JAMF information for this device. Skipping")
continue
logging.info('Checking out new item {} to user {}'.format(jamf['general']['name'], jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])]))
checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])],new_snipe_asset[1].json()['payload']['id'], "NewAsset")
checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])],
new_snipe_asset[1].json()['payload']['id'], "NewAsset")
# Log an error if there's an issue, or more than once match.
elif snipe == 'MultiMatch':
logging.warning("WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe Admin settings. Skipping serial number {} for now.".format(jamf['general']['serial_number']))
Expand Down Expand Up @@ -903,7 +916,7 @@ for jamf_type in jamf_types:
jamf_value = jamf_value[item]
payload = {snipekey: jamf_value}
latestvalue = jamf_value
except (KeyError, TypeError):
except (KeyError, TypeError, IndexError):
logging.debug("Skipping the payload, because the JAMF key we're mapping to doesn't exist")
continue

Expand Down Expand Up @@ -933,11 +946,11 @@ for jamf_type in jamf_types:
if ((user_args.users or user_args.users_inverse) and (snipe['rows'][0]['assigned_to'] == None) == user_args.users) or user_args.users_force:

if snipe['rows'][0]['status_label']['status_meta'] in ('deployable', 'deployed'):
jamfsplit = config['user-mapping']['jamf_api_field'].split()
if jamfsplit[1] not in jamf[jamfsplit[0]]:
logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0]))
user = get_config_value(config['user-mapping']['jamf_api_field'], jamf)
if not user:
logging.info("User not found in JAMF information for this device. Skipping")
continue
checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])], snipe_id, snipe['rows'][0]['assigned_to'])
checkout_snipe_asset(user, snipe_id, snipe['rows'][0]['assigned_to'])
else:
logging.info("Can't checkout {} since the status isn't set to deployable".format(jamf['general']['name']))

Expand Down