|
| 1 | +from datetime import datetime, timedelta |
| 2 | +from ansible.module_utils.basic import AnsibleModule |
| 3 | +import requests |
| 4 | + |
| 5 | + |
| 6 | +def check_existing_token(base_url, username, password): |
| 7 | + """Check existing tokens for user and return token + warning if expiring.""" |
| 8 | + headers = {"Accept": "application/json"} |
| 9 | + tokens_url = f"{base_url}/api/users/tokens/" |
| 10 | + |
| 11 | + try: |
| 12 | + response = requests.get(tokens_url, headers=headers, auth=(username, password)) |
| 13 | + response.raise_for_status() |
| 14 | + except requests.exceptions.RequestException as e: |
| 15 | + return None, f"Failed to fetch tokens: {e}" |
| 16 | + |
| 17 | + data = response.json() |
| 18 | + tokens = data.get("results", []) |
| 19 | + |
| 20 | + if not tokens: |
| 21 | + return None, "No tokens found" |
| 22 | + |
| 23 | + if len(tokens) > 1: |
| 24 | + return None, "Multiple tokens found, expected exactly 1" |
| 25 | + |
| 26 | + token = tokens[0] |
| 27 | + expires = token.get("expires") |
| 28 | + |
| 29 | + if expires: |
| 30 | + try: |
| 31 | + expire_date = datetime.fromisoformat(expires.replace("Z", "+00:00")) |
| 32 | + tomorrow = datetime.now(tz=expire_date.tzinfo) + timedelta(days=1) |
| 33 | + if expire_date <= tomorrow: |
| 34 | + return token, "Token expiring within 1 day" |
| 35 | + except ValueError: |
| 36 | + return token, f"Invalid expiration date format: {expires}" |
| 37 | + |
| 38 | + return token, None |
| 39 | + |
| 40 | + |
| 41 | +def create_new_token( |
| 42 | + base_url, username, password, user_token, description="ansible-created-token" |
| 43 | +): |
| 44 | + """Create a new Nautobot token using Basic Auth.""" |
| 45 | + tokens_url = f"{base_url}/api/users/tokens/" |
| 46 | + headers = {"Content-Type": "application/json", "Accept": "application/json"} |
| 47 | + payload = {"key": user_token, "description": description, "write_enabled": True} |
| 48 | + |
| 49 | + try: |
| 50 | + response = requests.post( |
| 51 | + tokens_url, headers=headers, json=payload, auth=(username, password) |
| 52 | + ) |
| 53 | + response.raise_for_status() |
| 54 | + except requests.exceptions.RequestException as e: |
| 55 | + return None, f"Failed to create new token: {e}" |
| 56 | + |
| 57 | + return response.json(), None |
| 58 | + |
| 59 | + |
| 60 | +def run_module(): |
| 61 | + module_args = dict( |
| 62 | + base_url=dict(type="str", required=True), |
| 63 | + username=dict(type="str", required=True), |
| 64 | + password=dict(type="str", required=True, no_log=True), |
| 65 | + token=dict(type="str", required=True, no_log=True), |
| 66 | + replace_if_expiring=dict(type="bool", default=True), |
| 67 | + create_if_notfound=dict(type="bool", default=True), |
| 68 | + token_description=dict(type="str", default="ansible-created-token"), |
| 69 | + ) |
| 70 | + |
| 71 | + module = AnsibleModule(argument_spec=module_args, supports_check_mode=True) |
| 72 | + result = dict(changed=False, token=None, message="") |
| 73 | + |
| 74 | + base_url = module.params["base_url"].rstrip("/") |
| 75 | + username = module.params["username"] |
| 76 | + password = module.params["password"] |
| 77 | + user_token = module.params["token"] |
| 78 | + replace_if_expiring = module.params["replace_if_expiring"] |
| 79 | + create_if_notfound = module.params["create_if_notfound"] |
| 80 | + token_description = module.params["token_description"] |
| 81 | + |
| 82 | + if module.check_mode: |
| 83 | + module.exit_json(**result) |
| 84 | + |
| 85 | + # Check existing token |
| 86 | + token, warning = check_existing_token(base_url, username, password) |
| 87 | + |
| 88 | + if token: |
| 89 | + # If token is expiring and replace_if_expiring=True → create new token |
| 90 | + if warning and replace_if_expiring: |
| 91 | + new_token, err = create_new_token( |
| 92 | + base_url, username, password, user_token, token_description |
| 93 | + ) |
| 94 | + if err: |
| 95 | + module.fail_json(msg=err) |
| 96 | + result.update( |
| 97 | + changed=True, |
| 98 | + message=f"Old token expiring, created new token for {username}", |
| 99 | + ) |
| 100 | + module.exit_json(**result) |
| 101 | + |
| 102 | + # Token is valid → return metadata only |
| 103 | + result.update( |
| 104 | + changed=False, |
| 105 | + message=f"Found valid token for {username}", |
| 106 | + token=dict( |
| 107 | + id=str(token.get("id")), |
| 108 | + display=str(token.get("display")), |
| 109 | + created=str(token.get("created")), |
| 110 | + expires=str(token.get("expires")), |
| 111 | + write_enabled=bool(token.get("write_enabled")), |
| 112 | + description=str(token.get("description", "No description")), |
| 113 | + ), |
| 114 | + ) |
| 115 | + module.exit_json(**result) |
| 116 | + |
| 117 | + # No token found → create new if allowed |
| 118 | + if create_if_notfound: |
| 119 | + new_token, err = create_new_token( |
| 120 | + base_url, username, password, user_token, token_description |
| 121 | + ) |
| 122 | + if err: |
| 123 | + module.fail_json(msg=err) |
| 124 | + result.update( |
| 125 | + changed=True, |
| 126 | + message=f"No token found, created new token for {username}", |
| 127 | + ) |
| 128 | + module.exit_json(**result) |
| 129 | + |
| 130 | + # No token and not allowed to create → fail |
| 131 | + module.fail_json(msg=f"No token found for {username} and creation disabled") |
| 132 | + |
| 133 | + |
| 134 | +def main(): |
| 135 | + run_module() |
| 136 | + |
| 137 | + |
| 138 | +if __name__ == "__main__": |
| 139 | + main() |
0 commit comments