|  | 
|  | 1 | +from ansible.module_utils.basic import AnsibleModule | 
|  | 2 | +import requests | 
|  | 3 | + | 
|  | 4 | + | 
|  | 5 | +def check_existing_token(base_url, username, password, user_token): | 
|  | 6 | +    """Check if a specific token exists for the user.""" | 
|  | 7 | +    headers = {"Accept": "application/json"} | 
|  | 8 | +    tokens_url = f"{base_url}/api/users/tokens/" | 
|  | 9 | + | 
|  | 10 | +    try: | 
|  | 11 | +        response = requests.get(tokens_url, headers=headers, auth=(username, password)) | 
|  | 12 | +        response.raise_for_status() | 
|  | 13 | +    except requests.exceptions.RequestException as e: | 
|  | 14 | +        return None, f"Failed to fetch tokens: {e}" | 
|  | 15 | + | 
|  | 16 | +    data = response.json() | 
|  | 17 | +    tokens = data.get("results", []) | 
|  | 18 | + | 
|  | 19 | +    if not tokens: | 
|  | 20 | +        return None, "No tokens found" | 
|  | 21 | + | 
|  | 22 | +    # Find the token matching user_token | 
|  | 23 | +    token = next((t for t in tokens if t.get("key") == user_token), None) | 
|  | 24 | +    if not token: | 
|  | 25 | +        return None, "Specified token not found for user" | 
|  | 26 | + | 
|  | 27 | +    return token, None | 
|  | 28 | + | 
|  | 29 | + | 
|  | 30 | +def create_new_token( | 
|  | 31 | +    base_url, username, password, user_token, description="ansible-created-token" | 
|  | 32 | +): | 
|  | 33 | +    """Create a new Nautobot token using Basic Auth.""" | 
|  | 34 | +    tokens_url = f"{base_url}/api/users/tokens/" | 
|  | 35 | +    headers = {"Content-Type": "application/json", "Accept": "application/json"} | 
|  | 36 | +    payload = {"key": user_token, "description": description, "write_enabled": True} | 
|  | 37 | + | 
|  | 38 | +    try: | 
|  | 39 | +        response = requests.post( | 
|  | 40 | +            tokens_url, headers=headers, json=payload, auth=(username, password) | 
|  | 41 | +        ) | 
|  | 42 | +        response.raise_for_status() | 
|  | 43 | +    except requests.exceptions.RequestException as e: | 
|  | 44 | +        return None, f"Failed to create new token: {e}" | 
|  | 45 | + | 
|  | 46 | +    return response.json(), None | 
|  | 47 | + | 
|  | 48 | + | 
|  | 49 | +def run_module(): | 
|  | 50 | +    module_args = dict( | 
|  | 51 | +        base_url=dict(type="str", required=True), | 
|  | 52 | +        username=dict(type="str", required=True), | 
|  | 53 | +        password=dict(type="str", required=True, no_log=True), | 
|  | 54 | +        user_token=dict(type="str", required=True, no_log=True), | 
|  | 55 | +        create_if_notfound=dict(type="bool", default=True), | 
|  | 56 | +        token_description=dict(type="str", default="ansible-created-token"), | 
|  | 57 | +    ) | 
|  | 58 | + | 
|  | 59 | +    module = AnsibleModule(argument_spec=module_args, supports_check_mode=True) | 
|  | 60 | +    result = dict(changed=False, token=None, message="") | 
|  | 61 | + | 
|  | 62 | +    base_url = module.params["base_url"].rstrip("/") | 
|  | 63 | +    username = module.params["username"] | 
|  | 64 | +    password = module.params["password"] | 
|  | 65 | +    user_token = module.params["user_token"] | 
|  | 66 | +    create_if_notfound = module.params["create_if_notfound"] | 
|  | 67 | +    token_description = module.params["token_description"] | 
|  | 68 | + | 
|  | 69 | +    if module.check_mode: | 
|  | 70 | +        module.exit_json(**result) | 
|  | 71 | + | 
|  | 72 | +    # Check existing token | 
|  | 73 | +    token, error = check_existing_token(base_url, username, password, user_token) | 
|  | 74 | + | 
|  | 75 | +    if token: | 
|  | 76 | +        result.update( | 
|  | 77 | +            changed=False, | 
|  | 78 | +            message=f"Found existing token for {username}", | 
|  | 79 | +            token=dict( | 
|  | 80 | +                id=str(token.get("id")), | 
|  | 81 | +                display=str(token.get("display")), | 
|  | 82 | +                created=str(token.get("created")), | 
|  | 83 | +                expires=str(token.get("expires")), | 
|  | 84 | +                write_enabled=bool(token.get("write_enabled")), | 
|  | 85 | +                description=str(token.get("description", "No description")), | 
|  | 86 | +            ), | 
|  | 87 | +        ) | 
|  | 88 | +        module.exit_json(**result) | 
|  | 89 | + | 
|  | 90 | +    # No token found → create new if allowed | 
|  | 91 | +    if create_if_notfound: | 
|  | 92 | +        new_token, err = create_new_token( | 
|  | 93 | +            base_url, username, password, user_token, token_description | 
|  | 94 | +        ) | 
|  | 95 | +        if err: | 
|  | 96 | +            module.fail_json(msg=err) | 
|  | 97 | +        result.update( | 
|  | 98 | +            changed=True, | 
|  | 99 | +            message=f"No token found, created new token for {username}", | 
|  | 100 | +            token=new_token, | 
|  | 101 | +        ) | 
|  | 102 | +        module.exit_json(**result) | 
|  | 103 | + | 
|  | 104 | +    # No token and not allowed to create → fail | 
|  | 105 | +    module.fail_json(msg=f"No token found for {username} and creation disabled") | 
|  | 106 | + | 
|  | 107 | + | 
|  | 108 | +def main(): | 
|  | 109 | +    run_module() | 
|  | 110 | + | 
|  | 111 | + | 
|  | 112 | +if __name__ == "__main__": | 
|  | 113 | +    main() | 
0 commit comments