-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update credential cache implementation
- Loading branch information
1 parent
c83a51b
commit 77068fb
Showing
2 changed files
with
25 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,44 @@ | ||
from botocore.utils import JSONFileCache | ||
import boto3 | ||
|
||
import datetime | ||
import os | ||
import json | ||
|
||
|
||
def json_encoder(obj): | ||
"""JSON encoder that formats datetime as ISO8601 format.""" | ||
if isinstance(obj, datetime.datetime): | ||
return obj.isoformat() | ||
else: | ||
return obj | ||
|
||
|
||
class JSONFileCache(object): | ||
"""JSON file cache. | ||
This provides a dict like interface that stores JSON serializable | ||
objects. | ||
The objects are serialized to JSON and stored in a file. These | ||
values can be retrieved at a later time. | ||
""" | ||
# TODO: Need to add windows support | ||
CACHE_DIR = os.path.expanduser(os.path.join('~', '.aws', 'cli', 'cache')) | ||
|
||
def __init__(self, working_dir=CACHE_DIR): | ||
self._working_dir = working_dir | ||
|
||
def __contains__(self, cache_key): | ||
actual_key = self._convert_cache_key(cache_key) | ||
return os.path.isfile(actual_key) | ||
|
||
def __getitem__(self, cache_key): | ||
"""Retrieve value from a cache key.""" | ||
actual_key = self._convert_cache_key(cache_key) | ||
try: | ||
with open(actual_key) as f: | ||
return json.load(f) | ||
except (OSError, ValueError, IOError): | ||
raise KeyError(cache_key) | ||
|
||
def __setitem__(self, cache_key, value): | ||
full_key = self._convert_cache_key(cache_key) | ||
try: | ||
file_content = json.dumps(value, default=json_encoder) | ||
except (TypeError, ValueError): | ||
raise ValueError( | ||
"Value cannot be cached, must be JSON serializable: %s" % value) | ||
if not os.path.isdir(self._working_dir): | ||
os.makedirs(self._working_dir) | ||
with os.fdopen(os.open(full_key, | ||
os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: | ||
f.truncate() | ||
f.write(file_content) | ||
|
||
def _convert_cache_key(self, cache_key): | ||
full_path = os.path.join(self._working_dir, cache_key + '.json') | ||
return full_path | ||
|
||
|
||
class TokenGenerationException(Exception): | ||
"Exception for credential not having token" | ||
pass | ||
|
||
|
||
def aws_credentials(argument): | ||
session = boto3.Session(profile_name=argument.profile, region_name=argument.region) | ||
session = boto3.Session( | ||
profile_name=argument.profile, | ||
region_name=argument.region | ||
) | ||
|
||
# Setting up a custom cache implementation like aws cli | ||
cred_chain = session._session.get_component('credential_provider') | ||
# TODO: Introduce cache for sso & any other supported provider | ||
provider = cred_chain.get_provider('assume-role') | ||
provider.cache = JSONFileCache() | ||
__update_credential_provider_cache(session) | ||
|
||
creds = session.get_credentials() | ||
if creds.token is None: | ||
raise TokenGenerationException() | ||
|
||
return creds, session.region_name | ||
|
||
|
||
def __update_credential_provider_cache(session): | ||
"Setting up a custom cache implementation like aws cli" | ||
|
||
cred_chain = session._session.get_component('credential_provider') | ||
jsonFileCache = JSONFileCache() | ||
|
||
def _update(provider_name): | ||
cred_chain.get_provider(provider_name).cache = jsonFileCache | ||
|
||
provider_for_cache = [ | ||
'assume-role', | ||
'assume-role-with-web-identity', | ||
'sso' | ||
] | ||
|
||
[_update(each_provider) for each_provider in provider_for_cache] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters