Skip to content

Commit

Permalink
Merge pull request #18 from shenwpo/Bearer
Browse files Browse the repository at this point in the history
Implement bearer authorization
  • Loading branch information
leeqvip authored Mar 8, 2021
2 parents d8dc7d0 + 6f5d7c5 commit a6abbc9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 8 deletions.
5 changes: 4 additions & 1 deletion flask_authz/casbin_enforcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def wrapper(*args, **kwargs):
if header == "authorization":
# Get Auth Value then decode and parse for owner
try:
owner = authorization_decoder(request.headers.get(header))
owner = authorization_decoder(self.app.config, request.headers.get(header))
except UnSupportedAuthType:
# Continue if catch unsupported type in the event of
# Other headers needing to be checked
Expand All @@ -89,6 +89,9 @@ def wrapper(*args, **kwargs):
"decoding is unsupported by flask-casbin at this time"
)
continue
except Exception as e:
self.app.logger.info(e)
continue

if self.user_name_headers and header in map(str.lower, self.user_name_headers):
owner_audit = owner
Expand Down
10 changes: 8 additions & 2 deletions flask_authz/utils/auth_decoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from base64 import b64decode

import jwt


class UnSupportedAuthType(Exception):
status_code = 501
Expand All @@ -20,11 +22,12 @@ def to_dict(self):
return rv


def authorization_decoder(auth_str: str):
def authorization_decoder(config, auth_str: str):
"""
Authorization token decoder based on type. This will decode the token and
only return the owner
Args:
config: app.config object
auth_str: Authorization string should be in "<type> <token>" format
Returns:
decoded owner from token
Expand All @@ -35,6 +38,9 @@ def authorization_decoder(auth_str: str):
"""Basic format <user>:<password> return only the user"""
return b64decode(token).decode().split(":")[0]
elif type == "Bearer":
raise UnSupportedAuthType("Bearer is not implemented yet")
"""return only the identity, depends on JWT 2.x"""
decoded_jwt = jwt.decode(token, config.get("JWT_SECRET_KEY"),
algorithms=config.get("JWT_HASH"))
return decoded_jwt.get("identity", "")
else:
raise UnSupportedAuthType("%s Authorization is not supported" % type)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ jinja2==2.11.2
markupsafe==1.1.1
simpleeval==0.9.10
werkzeug==1.0.1
PyJWT==2.0.1
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ def app_fixture():
)
# Set headers where owner for enforcement policy should be located
app.config["CASBIN_OWNER_HEADERS"] = {"Authorization", "X-User", "X-Idp-Groups"}
app.config["JWT_SECRET_KEY"] = "SECRET_KEY"
app.config["JWT_HASH"] = "HS256"

yield app
6 changes: 6 additions & 0 deletions tests/test_casbin_enforcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ def update_callback(self):
("X-Idp-Groups", "noexist testnoexist users", "GET", 200, None),
("X-Idp-Groups", "noexist, testnoexist, users", "GET", 200, None),
("Authorization", "Basic Ym9iOnBhc3N3b3Jk", "GET", 200, "Authorization"),
("Authorization", "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpZGVudGl0eSI6ImJvYiJ9."
"LM-CqxAM2MtT2uT3AO69rZ3WJ81nnyMQicizh4oqBwk", "GET", 200, None),
("Authorization", "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
"eyJleHAiOjE2MTUxMDg0OTIuNTY5MjksImlkZW50aXR5IjoiQm9iIn0."
"CAeMpG-gKbucHU7-KMiqM7H_gTkHSRvXSjNtlvh5DlE", "GET", 401, None),
("Authorization", "Unsupported Ym9iOnBhc3N3b3Jk", "GET", 401, None),
("Authorization", "Unsupported Ym9iOnBhc3N3b3Jk", "GET", 401, None),
],
)
Expand Down
29 changes: 24 additions & 5 deletions tests/test_utils_auth_decoder.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
import jwt
import pytest
from flask_authz.utils import authorization_decoder, UnSupportedAuthType


@pytest.mark.parametrize("auth_str, result", [("Basic Ym9iOnBhc3N3b3Jk", "Bob")])
def test_auth_docode(auth_str, result):
assert authorization_decoder(auth_str) == "bob"
def test_auth_docode(app_fixture, auth_str, result):
assert authorization_decoder(app_fixture.config, auth_str) == "bob"


@pytest.mark.parametrize(
"auth_str", [("Bearer Ym9iOnBhc3N3b3Jk"), ("Unsupported Ym9iOnBhc3N3b3Jk")]
"auth_str, result",
[("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpZGVudGl0eSI6IkJvYiJ9"
".YZqkPHdrxkkFNg7GNL8g-hRpiD9LPyospO47Mh3iEDk", "Bob")])
def test_auth_docode(app_fixture, auth_str, result):
assert authorization_decoder(app_fixture.config, auth_str) == "Bob"


@pytest.mark.parametrize(
"auth_str", [("Unsupported Ym9iOnBhc3N3b3Jk")]
)
def test_auth_docode_exceptions(auth_str):
def test_auth_docode_exceptions(app_fixture, auth_str):
with pytest.raises(UnSupportedAuthType):
authorization_decoder(auth_str)
authorization_decoder(app_fixture.config, auth_str)


@pytest.mark.parametrize(
"auth_str",
[("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTUxMDg0OTIuNTY5MjksImlkZW50aXR5IjoiQm9iIn0."
"CAeMpG-gKbucHU7-KMiqM7H_gTkHSRvXSjNtlvh5DlE")]
)
def test_auth_docode_exceptions(app_fixture, auth_str):
with pytest.raises(jwt.ExpiredSignatureError):
authorization_decoder(app_fixture.config, auth_str)

0 comments on commit a6abbc9

Please sign in to comment.