Skip to content

Commit

Permalink
Merge pull request #1 from Garans/DEV_ENVIRONMENT
Browse files Browse the repository at this point in the history
Added URL parameter for connections and fixed connect to localhost dy…
  • Loading branch information
Oleksiy committed Mar 24, 2016
2 parents eac363a + 008ccd8 commit f44d22d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build
*.pyc
.idea
4 changes: 2 additions & 2 deletions asyncdynamo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/env python
#
#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand All @@ -25,7 +25,7 @@
raise ImportError("tornado library not installed. Install tornado. https://github.com/facebook/tornado")
try:
import boto
assert tuple(map(int,boto.Version.split('.'))) >= (2,3,0), "Boto >= 2.3.0 required."
assert tuple(map(int,boto.Version.split('.'))) >= (2,39,0), "Boto >= 2.39.0 required."
except ImportError:
raise ImportError("boto library not installed. Install boto. https://github.com/boto/boto")

Expand Down
95 changes: 58 additions & 37 deletions asyncdynamo/asyncdynamo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/env python
#
#
# Copyright 2012 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Expand Down Expand Up @@ -28,6 +28,7 @@
from collections import deque
import time
import logging
from urlparse import urlparse

from boto.connection import AWSAuthConnection
from boto.exception import DynamoDBResponseError
Expand All @@ -41,12 +42,12 @@
class AsyncDynamoDB(AWSAuthConnection):
"""
The main class for asynchronous connections to DynamoDB.
The user should maintain one instance of this class (though more than one is ok),
parametrized with the user's access key and secret key. Make calls with make_request
or the helper methods, and AsyncDynamoDB will maintain session tokens in the background.
As in Boto Layer1:
"This is the lowest-level interface to DynamoDB. Methods at this
layer map directly to API requests and parameters to the methods
Expand All @@ -55,52 +56,72 @@ class AsyncDynamoDB(AWSAuthConnection):
All responses are direct decoding of the JSON response bodies to
Python data structures via the json or simplejson modules."
"""

DefaultHost = 'dynamodb.us-east-1.amazonaws.com'
"""The default DynamoDB API endpoint to connect to."""

ServiceName = 'DynamoDB'
"""The name of the Service"""

Version = '20111205'
"""DynamoDB API version."""

ThruputError = "ProvisionedThroughputExceededException"
"""The error response returned when provisioned throughput is exceeded"""

ExpiredSessionError = 'com.amazon.coral.service#ExpiredTokenException'
"""The error response returned when session token has expired"""

UnrecognizedClientException = 'com.amazon.coral.service#UnrecognizedClientException'
'''Another error response that is possible with a bad session token'''

def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
host=None, debug=0, session_token=None,
host=None, debug=0, session_token=None, url=None,
authenticate_requests=True, validate_cert=True, max_sts_attempts=3, ioloop=None):
if not host:
host = self.DefaultHost
if url is not None:
self.url = url
parse_url = urlparse(self.url)
self.host = parse_url.hostname
self.port = parse_url.port
self.protocol = parse_url.scheme
else:
self.protocol = 'https' if is_secure else 'http'
self.host = host
self.port = port

url = '{0}://{1}'.format(self.protocol, self.host)

if self.port:
url += ':{}'.format(self.port)

self.url = url
self.validate_cert = validate_cert
self.authenticate_requests = authenticate_requests
AWSAuthConnection.__init__(self, host,
self.authenticate_requests = authenticate_requests
AWSAuthConnection.__init__(self, self.host,
aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy, proxy_port,
debug=debug, security_token=session_token)
is_secure, self.port, proxy, proxy_port,
debug=debug, security_token=session_token,
validate_certs=self.validate_cert)
self.ioloop = ioloop or IOLoop.instance()
self.http_client = AsyncHTTPClient(io_loop=self.ioloop)
self.pending_requests = deque()
self.sts = AsyncAwsSts(aws_access_key_id, aws_secret_access_key, ioloop=self.ioloop)
self.sts = AsyncAwsSts(aws_access_key_id,
aws_secret_access_key,
is_secure, self.port, proxy, proxy_port)
assert (isinstance(max_sts_attempts, int) and max_sts_attempts >= 0)
self.max_sts_attempts = max_sts_attempts

def _init_session_token_cb(self, error=None):
if error:
logging.warn("Unable to get session token: %s" % error)

def _required_auth_capability(self):
return ['hmac-v3-http']

def _update_session_token(self, callback, attempts=0, bypass_lock=False):
'''
Begins the logic to get a new session token. Performs checks to ensure
Expand All @@ -113,14 +134,14 @@ def _update_session_token(self, callback, attempts=0, bypass_lock=False):
self.provider.security_token = PENDING_SESSION_TOKEN_UPDATE # invalidate the current security token
return self.sts.get_session_token(
functools.partial(self._update_session_token_cb, callback=callback, attempts=attempts))

def _update_session_token_cb(self, creds, provider='aws', callback=None, error=None, attempts=0):
'''
Callback to use with `async_aws_sts`. The 'provider' arg is a bit misleading,
it is a relic from boto and should probably be left to its default. This will
take the new Credentials obj from `async_aws_sts.get_session_token()` and use
it to update self.provider, and then will clear the deque of pending requests.
A callback is optional. If provided, it must be callable without any arguments,
but also accept an optional error argument that will be an instance of BotoServerError.
'''
Expand Down Expand Up @@ -157,15 +178,15 @@ def raise_error():
request()
if callable(callback):
return callback()

def make_request(self, action, body='', callback=None, object_hook=None):
'''
Make an asynchronous HTTP request to DynamoDB. Callback should operate on
the decoded json response (with object hook applied, of course). It should also
accept an error argument, which will be a boto.exception.DynamoDBResponseError.
If there is not a valid session token, this method will ensure that a new one is fetched
and cache the request when it is retrieved.
and cache the request when it is retrieved.
'''
this_request = functools.partial(self.make_request, action=action,
body=body, callback=callback,object_hook=object_hook)
Expand All @@ -187,7 +208,7 @@ def cb_for_update(error=None):
self.Version, action),
'Content-Type' : 'application/x-amz-json-1.0',
'Content-Length' : str(len(body))}
request = HTTPRequest('https://%s' % self.host,
request = HTTPRequest(self.url,
method='POST',
headers=headers,
body=body,
Expand All @@ -198,7 +219,7 @@ def cb_for_update(error=None):
self._auth_handler.add_auth(request) # add signature to headers of the request
self.http_client.fetch(request, functools.partial(self._finish_make_request,
callback=callback, orig_request=this_request, token_used=self.provider.security_token, object_hook=object_hook)) # bam!

def _finish_make_request(self, response, callback, orig_request, token_used, object_hook=None):
'''
Check for errors and decode the json response (in the tornado response body), then pass on to orig callback.
Expand All @@ -219,7 +240,7 @@ def _finish_make_request(self, response, callback, orig_request, token_used, obj
return orig_request() # make_request will handle logic to get a new token if needed, and queue until it is fetched
else:
# because some errors are benign, include the response when an error is passed
return callback(json_response, error=DynamoDBResponseError(response.error.code,
return callback(json_response, error=DynamoDBResponseError(response.error.code,
response.error.message, json_response))

if json_response is None:
Expand All @@ -233,10 +254,10 @@ def get_item(self, table_name, key, callback, attributes_to_get=None,
'''
Return a set of attributes for an item that matches
the supplied key.
The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)
:type table_name: str
:param table_name: The name of the table to delete.
Expand All @@ -261,12 +282,12 @@ def get_item(self, table_name, key, callback, attributes_to_get=None,
data['ConsistentRead'] = True
return self.make_request('GetItem', body=json.dumps(data),
callback=callback, object_hook=object_hook)

def batch_get_item(self, request_items, callback):
"""
Return a set of attributes for a multiple items in
multiple tables using their primary keys.
The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)
Expand All @@ -277,7 +298,7 @@ def batch_get_item(self, request_items, callback):
data = {'RequestItems' : request_items}
json_input = json.dumps(data)
self.make_request('BatchGetItem', json_input, callback)

def put_item(self, table_name, item, callback, expected=None, return_values=None, object_hook=None):
'''
Create a new item or replace an old item with a new
Expand All @@ -286,7 +307,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None
key, the new item will completely replace the old item.
You can perform a conditional put by specifying an
expected rule.
The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)
Expand All @@ -306,7 +327,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None
name-value pairs before then were changed. Possible
values are: None or 'ALL_OLD'. If 'ALL_OLD' is
specified and the item is overwritten, the content
of the old item is returned.
of the old item is returned.
'''
data = {'TableName' : table_name,
'Item' : item}
Expand All @@ -317,7 +338,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None
json_input = json.dumps(data)
return self.make_request('PutItem', json_input, callback=callback,
object_hook=object_hook)

def query(self, table_name, hash_key_value, callback, range_key_conditions=None,
attributes_to_get=None, limit=None, consistent_read=False,
scan_index_forward=True, exclusive_start_key=None,
Expand All @@ -326,7 +347,7 @@ def query(self, table_name, hash_key_value, callback, range_key_conditions=None,
Perform a query of DynamoDB. This version is currently punting
and expecting you to provide a full and correct JSON body
which is passed as is to DynamoDB.
The callback should operate on a dict representing the decoded
response from DynamoDB (using the object_hook, if supplied)
Expand Down

0 comments on commit f44d22d

Please sign in to comment.