|
15 | 15 | import json
|
16 | 16 | from base64 import b64encode
|
17 | 17 |
|
| 18 | + |
18 | 19 | class OAuth2:
|
19 |
| - def __init__(self, client_credentials_hash, cert_params_hash): |
20 |
| - client_key = client_credentials_hash["client_key"] |
21 |
| - client_secret_key = client_credentials_hash["client_secret_key"] |
22 |
| - self.cert = (cert_params_hash["crt"], cert_params_hash["key"]) |
23 |
| - self.base64code = 'Basic ' + bytes.decode(b64encode(('%s:%s' %(client_key,client_secret_key)).encode('latin1')).strip()) |
24 |
| - |
25 |
| - |
26 |
| - #Get Acces Token |
27 |
| - def get_access_token(self, url, code, redirect_uri): |
28 |
| - request_params = {"grant_type":"authorization_code", "code": code , "redirect_uri":redirect_uri} |
29 |
| - header_params = {'Authorization' : self.base64code} |
30 |
| - request = requests.post(url, data = request_params, headers = header_params, cert = self.cert) |
31 |
| - if str(request.status_code) == "200": |
32 |
| - res = request.json() |
33 |
| - res.update({"status": request.status_code}) |
34 |
| - return res |
35 |
| - response = {"status": request.status_code, "error": request.text} |
36 |
| - return response |
37 |
| - |
38 |
| - |
39 |
| - # Refresh token will collect back the new access token |
40 |
| - def get_refresh_token(self, url, refresh_token): |
41 |
| - request_params = {"grant_type":"refresh_token", "refresh_token": refresh_token} |
42 |
| - header_params = {"Authorization":self.base64code} |
43 |
| - request = requests.post(url, data = request_params, headers = header_params, cert = self.cert) |
44 |
| - if str(request.status_code) == "200": |
45 |
| - res = request.json() |
46 |
| - res.update({"status": request.status_code}) |
47 |
| - return res |
48 |
| - response = {"status": request.status_code, "error": request.text} |
49 |
| - return response |
50 |
| - |
| 20 | + def __init__(self, client_credentials_hash, cert_params_hash): |
| 21 | + client_key = client_credentials_hash["client_key"] |
| 22 | + client_secret_key = client_credentials_hash["client_secret_key"] |
| 23 | + self.cert = (cert_params_hash["crt"], cert_params_hash["key"]) |
| 24 | + self.base64code = "Basic " + bytes.decode( |
| 25 | + b64encode( |
| 26 | + ("%s:%s" % (client_key, client_secret_key)).encode("latin1") |
| 27 | + ).strip() |
| 28 | + ) |
51 | 29 |
|
52 |
| - |
53 |
| - |
| 30 | + # Get Acces Token |
| 31 | + def get_access_token(self, url, code, redirect_uri): |
| 32 | + request_params = { |
| 33 | + "grant_type": "authorization_code", |
| 34 | + "code": code, |
| 35 | + "redirect_uri": redirect_uri, |
| 36 | + } |
| 37 | + header_params = {"Authorization": self.base64code} |
| 38 | + request = requests.post( |
| 39 | + url, data=request_params, headers=header_params, cert=self.cert |
| 40 | + ) |
| 41 | + if str(request.status_code) == "200": |
| 42 | + res = request.json() |
| 43 | + res.update({"status": request.status_code}) |
| 44 | + return res |
| 45 | + response = {"status": request.status_code, "error": request.text} |
| 46 | + return response |
| 47 | + |
| 48 | + # Refresh token will collect back the new access token |
| 49 | + def get_refresh_token(self, url, refresh_token): |
| 50 | + request_params = {"grant_type": "refresh_token", "refresh_token": refresh_token} |
| 51 | + header_params = {"Authorization": self.base64code} |
| 52 | + request = requests.post( |
| 53 | + url, data=request_params, headers=header_params, cert=self.cert |
| 54 | + ) |
| 55 | + if str(request.status_code) == "200": |
| 56 | + res = request.json() |
| 57 | + res.update({"status": request.status_code}) |
| 58 | + return res |
| 59 | + response = {"status": request.status_code, "error": request.text} |
| 60 | + return response |
0 commit comments