-
Notifications
You must be signed in to change notification settings - Fork 1
/
so4t_scim_client.py
170 lines (136 loc) · 6.51 KB
/
so4t_scim_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Open source libraries
import requests
class ScimClient:
def __init__(self, token, url, proxy=None):
self.base_url = url
self.token = token
self.headers = {
'Authorization': f"Bearer {self.token}"
}
if "stackoverflowteams.com" in self.base_url: # For Basic and Business tiers
self.soe = False
self.scim_url = f"{self.base_url}/auth/scim/v2/users"
else: # For Enterprise tier
self.soe = True
self.scim_url = f"{self.base_url}/api/scim/v2/users"
self.proxies = {'https': proxy} if proxy else {'https': None}
def get_user(self, account_id):
# Get a single user by account ID
scim_url = f"{self.scim_url}/{account_id}"
response = requests.get(scim_url, headers=self.headers, proxies=self.proxies)
if response.status_code == 404:
print(f"User with account ID {account_id} not found.")
return None
elif response.status_code != 200:
print(f"API call failed with status code: {response.status_code}.")
print(response.text)
return None
else:
print(f"Retrieved user with account ID {account_id}")
return response.json()
def get_all_users(self):
'''
Get all users via SCIM API
SCIM API returns a max of 100 items per call, so we need to paginate through the results
Example response for 1 user:
{
'active': False,
'name': {'givenName': 'Bruce', 'familyName': 'Banner'},
'emails': [
{'primary': True, 'value': '[email protected]'}
],
'schemas': ['urn:ietf:params:scim:schemas:core:2.0:User'],
'meta': {
'created': '2017-05-30T21:45:30.740Z',
'location': 'https://demo.stackenterprise.co/api/scim/v2/users/8'},
'id': '8',
'externalId': '00ufmj2tmtWu2X',
'userType': 'Admin',
'profileUrl': 'https://demo.stackenterprise.co/accounts/8',
'displayName': 'Bruce Banner',
'userName': '[email protected]'
}
'''
params = {
"count": 100,
"startIndex": 1,
}
items = []
while True: # Keep performing API calls until all items are received
print(f"Getting 100 results from {self.scim_url} with startIndex of {params['startIndex']}")
response = requests.get(self.scim_url, headers=self.headers, params=params,
proxies=self.proxies)
if response.status_code != 200:
print(f"API call failed with status code: {response.status_code}.")
print(response.text)
break
items_data = response.json().get('Resources')
items += items_data
params['startIndex'] += params['count']
if params['startIndex'] > response.json().get('totalResults'):
break
return items
def update_user(self, account_id, active=None, role=None):
# Update a user's active status or role via SCIM API
# Role changes require an admin setting to allow SCIM to modify user roles
# `role` can be one of the following: Registered, Moderator, Admin
# if another role is passed, it will be ignored (and still report HTTP 200)
# `active` can be True or False
valid_roles = ["Registered", "Moderator", "Admin"]
scim_url = f"{self.scim_url}/{account_id}"
payload = {}
if active is not None:
payload['active'] = active
if role is not None:
if role in valid_roles:
payload['userType'] = role
else:
print(f"Invalid role: {role}. Valid roles are: {valid_roles}")
return
response = requests.put(scim_url, headers=self.headers, json=payload, proxies=self.proxies)
if response.status_code == 404:
print(f"User with account ID {account_id} not found.")
elif response.status_code != 200:
print(f"API call failed with status code: {response.status_code}.")
print(response.text)
else:
print(f"Updated user with account ID {account_id}")
def delete_user(self, account_id, retries=0):
# By default, deleting users via SCIM is disabled. To enable it, open a support ticket.
# Deleting a user via SCIM requires the user's account ID (not user ID)
scim_url = f"{self.scim_url}/{account_id}"
print(f"Sending DELETE request to {scim_url}")
response = requests.delete(scim_url, headers=self.headers, proxies=self.proxies)
if response.status_code == 400:
# 400 Error responses:
# {"ErrorMessage":"You cannot delete or destroy System Accounts."}
print(f"Failed to delete user with account ID {account_id}")
print(response.json().get('ErrorMessage'))
elif response.status_code == 404:
print(f"User with account ID {account_id} not found.")
elif response.status_code == 403:
### NEED TO TEST THIS TO BE SURE
print(f"Deleting users via SCIM is disabled. To enable it, open a support ticket.")
elif response.status_code == 500:
# 500 Error responses:
# {"ErrorMessage":"Moderators cannot be deleted - tried to delete <UserName>.
# Adjust role to User."}
# "SCIM user modification failed - unknown user"
if "Adjust role to User" in response.json().get('ErrorMessage'):
print(f"User with account ID {account_id} cannot be deleted because they're "
"a moderator or admin.")
print("Reducing their role to User...")
self.update_user(account_id, role="Registered")
if retries < 3:
print("Retrying delete...")
self.delete_user(account_id, retries+1)
else:
print("Max retries reached. Aborting deletion.")
else:
print(f"Failed to delete user with account ID {account_id}")
print(response.json().get('ErrorMessage'))
elif response.status_code != 204:
print(f"API call failed with status code: {response.status_code}.")
print(response.text)
else:
print(f"Deleted user with account ID {account_id}")