From acc44f49625171f841e715ad1ec33ad6767fb7a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Gonz=C3=A1lez?= Date: Fri, 11 May 2012 16:18:04 -0400 Subject: [PATCH] Added connection queue support --- asyncmongo/cursor.py | 200 +++++++++++++++++++++++-------------------- asyncmongo/pool.py | 46 ++++++---- 2 files changed, 134 insertions(+), 112 deletions(-) diff --git a/asyncmongo/cursor.py b/asyncmongo/cursor.py index 83cbbb5..af3971f 100644 --- a/asyncmongo/cursor.py +++ b/asyncmongo/cursor.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2010 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -36,16 +36,16 @@ def __init__(self, dbname, collection, pool): assert isinstance(dbname, (str, unicode)) assert isinstance(collection, (str, unicode)) assert isinstance(pool, object) - + self.__dbname = dbname self.__collection = collection self.__pool = pool self.__slave_okay = False - + @property def full_collection_name(self): return u'%s.%s' % (self.__dbname, self.__collection) - + def drop(self, *args, **kwargs): raise NotImplemented("patches accepted") @@ -56,7 +56,7 @@ def save(self, doc, **kwargs): def insert(self, doc_or_docs, manipulate=True, safe=True, check_keys=True, callback=None, **kwargs): """Insert a document(s) into this collection. - + If `manipulate` is set, the document(s) are manipulated using any :class:`~pymongo.son_manipulator.SONManipulator` instances that have been added to this @@ -64,17 +64,17 @@ def insert(self, doc_or_docs, the inserted document or a list of ``"_id"`` values of the inserted documents. If the document(s) does not already contain an ``"_id"`` one will be added. - + If `safe` is ``True`` then the insert will be checked for errors, raising :class:`~pymongo.errors.OperationFailure` if one occurred. Safe inserts wait for a response from the database, while normal inserts do not. - + Any additional keyword arguments imply ``safe=True``, and will be used as options for the resultant `getLastError` command. For example, to wait for replication to 3 nodes, pass ``w=3``. - + :Parameters: - `doc_or_docs`: a document or list of documents to be inserted @@ -87,77 +87,82 @@ def insert(self, doc_or_docs, - `**kwargs` (optional): any additional arguments imply ``safe=True``, and will be used as options for the `getLastError` command - + .. mongodoc:: insert """ if not isinstance(safe, bool): raise TypeError("safe must be an instance of bool") - + docs = doc_or_docs # return_one = False if isinstance(docs, dict): # return_one = True docs = [docs] - + # if manipulate: # docs = [self.__database._fix_incoming(doc, self) for doc in docs] - + self.__limit = None if kwargs: safe = True - + if safe and not callable(callback): raise TypeError("callback must be callable") if not safe and callback is not None: raise TypeError("callback can not be used with safe=False") - + if callback: callback = functools.partial(self._handle_response, orig_callback=callback) - connection = self.__pool.connection() - try: - connection.send_message( - message.insert(self.full_collection_name, docs, - check_keys, safe, kwargs), callback=callback) - except: - connection.close() - raise - + #connection = self.__pool.connection() + def on_connect(connection): + try: + connection.send_message( + message.insert(self.full_collection_name, docs, + check_keys, safe, kwargs), callback=callback) + except: + connection.close() + raise + + self.__pool.add_to_queue(on_connect) + def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs): if not isinstance(safe, bool): raise TypeError("safe must be an instance of bool") - + if spec_or_id is None: spec_or_id = {} if not isinstance(spec_or_id, dict): spec_or_id = {"_id": spec_or_id} - + self.__limit = None if kwargs: safe = True - + if safe and not callable(callback): raise TypeError("callback must be callable") if not safe and callback is not None: raise TypeError("callback can not be used with safe=False") - + if callback: callback = functools.partial(self._handle_response, orig_callback=callback) - connection = self.__pool.connection() - try: - connection.send_message( - message.delete(self.full_collection_name, spec_or_id, safe, kwargs), - callback=callback) - except: - connection.close() - raise + #connection = self.__pool.connection() + def on_connect(connection): + try: + connection.send_message( + message.delete(self.full_collection_name, spec_or_id, safe, kwargs), + callback=callback) + except: + connection.close() + raise + + self.__pool.add_to_queue(on_connect) - def update(self, spec, document, upsert=False, manipulate=False, safe=True, multi=False, callback=None, **kwargs): """Update a document(s) in this collection. - + Raises :class:`TypeError` if either `spec` or `document` is not an instance of ``dict`` or `upsert` is not an instance of ``bool``. If `safe` is ``True`` then the update will be @@ -166,14 +171,14 @@ def update(self, spec, document, upsert=False, manipulate=False, occurred. Safe updates require a response from the database, while normal updates do not - thus, setting `safe` to ``True`` will negatively impact performance. - + There are many useful `update modifiers`_ which can be used when performing updates. For example, here we use the ``"$set"`` modifier to modify some fields in a matching document: - + .. doctest:: - + >>> db.test.insert({"x": "y", "a": "b"}) ObjectId('...') >>> list(db.test.find()) @@ -181,15 +186,15 @@ def update(self, spec, document, upsert=False, manipulate=False, >>> db.test.update({"x": "y"}, {"$set": {"a": "c"}}) >>> list(db.test.find()) [{u'a': u'c', u'x': u'y', u'_id': ObjectId('...')}] - + If `safe` is ``True`` returns the response to the *lastError* command. Otherwise, returns ``None``. - + # Any additional keyword arguments imply ``safe=True``, and will # be used as options for the resultant `getLastError` # command. For example, to wait for replication to 3 nodes, pass # ``w=3``. - + :Parameters: - `spec`: a ``dict`` or :class:`~bson.son.SON` instance specifying elements which must be present for a document @@ -214,9 +219,9 @@ def update(self, spec, document, upsert=False, manipulate=False, - `**kwargs` (optional): any additional arguments imply ``safe=True``, and will be used as options for the `getLastError` command - + .. _update modifiers: http://www.mongodb.org/display/DOCS/Updating - + .. mongodoc:: update """ if not isinstance(spec, dict): @@ -230,32 +235,35 @@ def update(self, spec, document, upsert=False, manipulate=False, # TODO: apply SON manipulators # if upsert and manipulate: # document = self.__database._fix_incoming(document, self) - + if kwargs: safe = True - + if safe and not callable(callback): raise TypeError("callback must be callable") if not safe and callback is not None: raise TypeError("callback can not be used with safe=False") - + if callback: callback = functools.partial(self._handle_response, orig_callback=callback) self.__limit = None - connection = self.__pool.connection() - try: - connection.send_message( - message.update(self.full_collection_name, upsert, multi, - spec, document, safe, kwargs), callback=callback) - except: - connection.close() - raise - - + #connection = self.__pool.connection() + def on_connect(connection): + try: + connection.send_message( + message.update(self.full_collection_name, upsert, multi, + spec, document, safe, kwargs), callback=callback) + except: + connection.close() + raise + + self.__pool.add_to_queue(on_connect) + + def find_one(self, spec_or_id, **kwargs): """Get a single document from the database. - + All arguments to :meth:`find` are also valid arguments for :meth:`find_one`, although any `limit` argument will be ignored. Returns a single document, or ``None`` if no matching @@ -265,29 +273,29 @@ def find_one(self, spec_or_id, **kwargs): spec_or_id = {"_id": spec_or_id} kwargs['limit'] = -1 self.find(spec_or_id, **kwargs) - + def find(self, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, slave_okay=False, _must_use_master=False, _is_command=False, hint=None, debug=False, callback=None): """Query the database. - + The `spec` argument is a prototype document that all results must match. For example: - + >>> db.test.find({"hello": "world"}, callback=...) - + only matches documents that have a key "hello" with value "world". Matches can have other keys *in addition* to "hello". The `fields` argument is used to specify a subset of fields that should be included in the result documents. By limiting results to a certain subset of fields you can cut down on network traffic and decoding time. - + Raises :class:`TypeError` if any of the arguments are of improper type. - + :Parameters: - `spec` (optional): a SON object specifying elements which must be present for a document to be included in the @@ -325,13 +333,13 @@ def find(self, spec=None, fields=None, skip=0, limit=0, examined when performing the query - `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance - + .. mongodoc:: find """ - + if spec is None: spec = {} - + if not isinstance(spec, dict): raise TypeError("spec must be an instance of dict") if not isinstance(skip, int): @@ -346,19 +354,19 @@ def find(self, spec=None, fields=None, skip=0, limit=0, raise TypeError("tailable must be an instance of bool") if not callable(callback): raise TypeError("callback must be callable") - + if fields is not None: if not fields: fields = {"_id": 1} if not isinstance(fields, dict): fields = helpers._fields_list_to_dict(fields) - + self.__spec = spec self.__fields = fields self.__skip = skip self.__limit = limit self.__batch_size = 0 - + self.__timeout = timeout self.__tailable = tailable self.__snapshot = snapshot @@ -372,25 +380,27 @@ def find(self, spec=None, fields=None, skip=0, limit=0, self.__tz_aware = False #collection.database.connection.tz_aware self.__must_use_master = _must_use_master self.__is_command = _is_command - - connection = self.__pool.connection() - try: - if self.__debug: - logging.debug('QUERY_SPEC: %r' % self.__query_spec()) - - connection.send_message( - message.query(self.__query_options(), - self.full_collection_name, - self.__skip, - self.__limit, - self.__query_spec(), - self.__fields), - callback=functools.partial(self._handle_response, orig_callback=callback)) - except Exception, e: - logging.error('Error sending query %s' % e) - connection.close() - raise - + + #connection = self.__pool.connection() + def on_connect(connection): + try: + if self.__debug: + logging.debug('QUERY_SPEC: %r' % self.__query_spec()) + + connection.send_message( + message.query(self.__query_options(), + self.full_collection_name, + self.__skip, + self.__limit, + self.__query_spec(), + self.__fields), + callback=functools.partial(self._handle_response, orig_callback=callback)) + except Exception, e: + logging.error('Error sending query %s' % e) + connection.close() + raise + self.__pool.add_to_queue(on_connect) + def _handle_response(self, result, error=None, orig_callback=None): if result and result.get('cursor_id'): connection = self.__pool.connection() @@ -402,7 +412,7 @@ def _handle_response(self, result, error=None, orig_callback=None): logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e)) connection.close() raise - + if error: logging.error('%s %s' % (self.full_collection_name , error)) orig_callback(None, error=error) @@ -413,7 +423,7 @@ def _handle_response(self, result, error=None, orig_callback=None): else: orig_callback(result['data'], error=None) - + def __query_options(self): """Get the query options string to use for this query.""" options = 0 @@ -424,7 +434,7 @@ def __query_options(self): if not self.__timeout: options |= _QUERY_OPTIONS["no_timeout"] return options - + def __query_spec(self): """Get the spec to use for a query.""" spec = self.__spec @@ -441,5 +451,5 @@ def __query_spec(self): if self.__max_scan: spec["$maxScan"] = self.__max_scan return spec - - + + diff --git a/asyncmongo/pool.py b/asyncmongo/pool.py index 97d6411..0b58272 100644 --- a/asyncmongo/pool.py +++ b/asyncmongo/pool.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2010 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,6 +15,7 @@ # under the License. from threading import Condition +from collections import deque import logging from errors import TooManyConnections, ProgrammingError from connection import Connection @@ -35,7 +36,7 @@ def get_connection_pool(self, pool_id, *args, **kwargs): if pool_id not in self._pools: self._pools[pool_id] = ConnectionPool(*args, **kwargs) return self._pools[pool_id] - + @classmethod def close_idle_connections(self, pool_id=None): """close idle connections to mongo""" @@ -54,7 +55,7 @@ def close_idle_connections(self, pool_id=None): class ConnectionPool(object): """Connection Pool to a single mongo instance. - + :Parameters: - `mincached` (optional): minimum connections to open on instantiation. 0 to open connections on first use - `maxcached` (optional): maximum inactive cached connections for this pool. 0 for unlimited @@ -63,15 +64,15 @@ class ConnectionPool(object): - `dbname`: mongo database name - `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance - `**kwargs`: passed to `connection.Connection` - + """ - def __init__(self, - mincached=0, - maxcached=0, - maxconnections=0, - maxusage=0, - dbname=None, - slave_okay=False, + def __init__(self, + mincached=0, + maxcached=0, + maxconnections=0, + maxusage=0, + dbname=None, + slave_okay=False, *args, **kwargs): assert isinstance(mincached, int) assert isinstance(maxcached, int) @@ -90,25 +91,32 @@ def __init__(self, self._maxcached = maxcached self._maxconnections = maxconnections self._idle_cache = [] # the actual connections that can be used + self._queue = deque() self._condition = Condition() self._dbname = dbname self._slave_okay = slave_okay self._connections = 0 - + # Establish an initial number of idle database connections: idle = [self.connection() for i in range(mincached)] while idle: self.cache(idle.pop()) - + def new_connection(self): kwargs = self._kwargs kwargs['pool'] = self return Connection(*self._args, **kwargs) - + + def add_to_queue(self, callback): + try: + callback(self.connection()) + except TooManyConnections: + self._queue.append(callback) + def connection(self): """ get a cached connection from the pool """ - + self._condition.acquire() try: if (self._maxconnections and self._connections >= self._maxconnections): @@ -135,6 +143,10 @@ def cache(self, con): # called via socket close on a connection in the idle cache self._condition.release() return + if self._queue: + waiting = self._queue.popleft() + waiting(con) + return try: if not self._maxcached or len(self._idle_cache) < self._maxcached: # the idle cache is not full, so put it there @@ -146,7 +158,7 @@ def cache(self, con): finally: self._connections -= 1 self._condition.release() - + def close(self): """Close all connections in the pool.""" self._condition.acquire() @@ -161,5 +173,5 @@ def close(self): self._condition.notifyAll() finally: self._condition.release() - +