diff --git a/.gitignore b/.gitignore index 7c413dc..04eb895 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,9 @@ build tmp *.dep .settings +*.egg +lib/concurrence/http/concurrence.http._http.c +lib/concurrence/concurrence._event14.c +concurrence.egg-info +debian/concurrence +*.swp diff --git a/Makefile b/Makefile index fc766c7..507a99c 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,14 @@ +CWD=$(shell pwd) +ifndef PYTHON + export PYTHON=python2.6 +endif +ifndef DESTDIR + export DESTDIR=$(CWD)/debian/tmp +endif +export PYTHONPATH=/usr/share/pyshared:$(DESTDIR)/usr/lib/$(PYTHON) + +.PHONY: test install doc + build: $(PYTHON) setup.py build @@ -7,19 +18,18 @@ ext: egg: $(PYTHON) setup.py bdist_egg -install: - $(PYTHON) setup.py install +install: build + mkdir -p $(DESTDIR)/usr/lib/$(PYTHON) + $(PYTHON) setup.py install --prefix=$(DESTDIR)/usr --install-purelib=$(DESTDIR)/usr/lib/$(PYTHON) --install-platlib=$(DESTDIR)/usr/lib/$(PYTHON) --install-layout=deb --root=/ sdist: $(PYTHON) setup.py sdist -_doc: +doc: rm -rf doc/html cd doc/src; make html cp -a doc/src/_build/html doc/html -doc: _doc - clean: -find . -name *.pyc -exec rm -rf {} \; -find . -name *.so -exec rm -rf {} \; @@ -38,10 +48,17 @@ clean: dist_clean: clean find . -name .svn -exec rm -rf {} \; -_test: +test: install cd test; make test -test: _test +test-test: install + $(PYTHON) test/testtest.py + +test-core: install + $(PYTHON) test/testcore.py + +test-memcache: install + $(PYTHON) test/testmemcache.py coverage: cd test; coverage erase diff --git a/debian/.gitignore b/debian/.gitignore new file mode 100644 index 0000000..9871a13 --- /dev/null +++ b/debian/.gitignore @@ -0,0 +1,3 @@ +*.log +*.substvars +/files diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..a6544f5 --- /dev/null +++ b/debian/changelog @@ -0,0 +1,187 @@ +concurrence (0.3.2-26) unstable; urgency=low + + * Children of destroyed parents are adopted by grandparents + + -- Alexander Lourier Sat, 03 Aug 2013 23:11:44 +0400 + +concurrence (0.3.2-25) unstable; urgency=low + + * Daemons are not excluded from tasklets hierarchy + + -- Alexander Lourier Sat, 03 Aug 2013 21:52:44 +0400 + +concurrence (0.3.2-24) unstable; urgency=low + + * MySQL: less verbose logging + * MySQL: keep connection after ClientCommandError + + -- Alexander Lourier Tue, 30 Jul 2013 17:22:26 +0400 + +concurrence (0.3.2-23) unstable; urgency=low + + * Allow to close MySQL connection in other states + + -- Alexander Lourier Wed, 26 Jun 2013 13:52:15 +0400 + +concurrence (0.3.2-22) unstable; urgency=low + + * MySQL logging fixed + + -- Alexander Lourier Wed, 26 Jun 2013 12:39:49 +0400 + +concurrence (0.3.2-21) unstable; urgency=low + + * Correctly handle requests interrupted by client when sending request body + + -- Alexander Lourier Sun, 28 Oct 2012 01:42:04 +0400 + +concurrence (0.3.2-20) unstable; urgency=low + + * Memcached: survive after connection loss + + -- Alexander Lourier Mon, 01 Oct 2012 01:17:03 +0400 + +concurrence (0.3.2-19) unstable; urgency=low + + * Memcached: handle exceptions during flush + * HEAD support for HTTP Server + * CompatibileFile fixes + * BufferedReader: corner case bugfixes + * Test environment enhancements + * Major race condition in concurrence core fixed + * BaseExceptions are not handled anymore + + -- Alexander Lourier Sun, 30 Sep 2012 19:34:19 +0400 + +concurrence (0.3.2-18) unstable; urgency=low + + * MySQL RLE encoding support for 253 code + + -- Alexander Lourier Thu, 22 Mar 2012 12:03:09 +0400 + +concurrence (0.3.2-17) unstable; urgency=low + + * Error handling in Memcached module + + -- Alexander Lourier Wed, 14 Dec 2011 10:08:29 +0400 + +concurrence (0.3.2-16) unstable; urgency=low + + * HEAD method support + * 502 Not Implemented response to any not known method + + -- Alexander Lourier Mon, 05 Sep 2011 14:33:20 +0400 + +concurrence (0.3.2-15) unstable; urgency=low + + * WSGIInputStream.readline() now accepts parameter maxlen=-1 (does nothing although) + + -- Alexander Lourier Mon, 15 Aug 2011 13:59:17 +0400 + +concurrence (0.3.2-14) unstable; urgency=low + + * readline() now accepts parameter maxlen=-1 (does nothing although) + + -- Alexander Lourier Mon, 15 Aug 2011 13:55:53 +0400 + +concurrence (0.3.2-13) unstable; urgency=low + + * Moving to squeeze + + -- Alexander Lourier Sun, 13 Feb 2011 22:19:34 +0300 + +concurrence (0.3.2-12) unstable; urgency=low + + * Bugfix with ADNS + + -- Alexander Lourier Sun, 07 Nov 2010 00:32:55 +0300 + +concurrence (0.3.2-11) unstable; urgency=low + + * ADNS support + + -- Alexander Lourier Sat, 06 Nov 2010 20:23:53 +0300 + +concurrence (0.3.2-10) unstable; urgency=low + + * Thrift and SMTP implementation + + -- Alexander Lourier Sat, 30 Oct 2010 10:38:57 +0400 + +concurrence (0.3.2-9) unstable; urgency=low + + * Deferred evaluation of chunks + + -- Alexander Lourier Sun, 24 Oct 2010 01:56:14 +0400 + +concurrence (0.3.2-8) unstable; urgency=low + + * Minor changes + + -- Alexander Lourier Sat, 23 Oct 2010 01:34:17 +0400 + +concurrence (0.3.2-6) unstable; urgency=low + + * Size and time limits on HTTP client downloads + + -- Alexander Lourier Thu, 09 Sep 2010 16:14:35 +0400 + +concurrence (0.3.2-5) unstable; urgency=low + + * Fixed bug with readline buffer size + + -- Alexander Lourier Thu, 09 Sep 2010 14:51:32 +0400 + +concurrence (0.3.2-4) unstable; urgency=low + + * libevent dependency + + -- Alexander Lourier Fri, 06 Aug 2010 13:54:01 +0400 + +concurrence (0.3.2-2) unstable; urgency=low + + * WSGIInputStream.readline implemented + + -- Alexander Lourier Wed, 04 Aug 2010 17:24:40 +0400 + +concurrence (0.3.2-1) unstable; urgency=low + + * New upstream release + + -- Alexander Lourier Fri, 04 Jun 2010 19:46:52 +0400 + +concurrence (0.3.1-6) unstable; urgency=low + + * python-webob added to dependencies + + -- Alexander Lourier Tue, 01 Jun 2010 18:08:50 +0400 + +concurrence (0.3.1-5) unstable; urgency=low + + * python-routes added to dependencies + + -- Alexander Lourier Tue, 01 Jun 2010 17:57:25 +0400 + +concurrence (0.3.1-4) unstable; urgency=low + + * Library paths fixed + + -- Alexander Lourier Tue, 01 Jun 2010 16:01:15 +0400 + +concurrence (0.3.1-3) unstable; urgency=low + + * Disabled pysupport + + -- Alexander Lourier Tue, 01 Jun 2010 13:10:17 +0400 + +concurrence (0.3.1-2) unstable; urgency=low + + * fixed install dir + + -- Alexander Lourier Sat, 27 Mar 2010 21:15:46 +0300 + +concurrence (0.3.1-1) unstable; urgency=low + + * Initial release + + -- Alexander Lourier Sat, 27 Mar 2010 19:24:28 +0300 diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..7f8f011 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +7 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..53254dc --- /dev/null +++ b/debian/control @@ -0,0 +1,18 @@ +Source: concurrence +Section: python +Priority: optional +Maintainer: Alexander Lourier +Build-Depends: debhelper (>= 7), python2.6-minimal (>= 2.6.stackless-1), libevent-dev (>= 1.4), python-pyrex, python2.6-dev (>= 2.6.stackless-1) +Standards-Version: 3.8.3 +Homepage: http://opensource.hyves.org/concurrence/ + +Package: concurrence +Architecture: any +Depends: ${shlibs:Depends}, ${misc:Depends}, python2.6-minimal (>= 2.6.stackless-1), libevent-1.4-2, ${python:Depends} +Recommends: python-routes, python-webob +Description: Framework for creating massively concurrent network applications in Python. + It takes a Lightweight-tasks-with-message-passing approach to concurrency. + The goal of Concurrence is to provide an easier programming model for writing + high performance network applications than existing solutions (Multi-threading, + Twisted, asyncore etc). Concurrence uses Lightweight tasks in combination with + libevent to expose a high-level synchronous API to low-level asynchronous IO. diff --git a/debian/copyright b/debian/copyright new file mode 100644 index 0000000..d9f53f5 --- /dev/null +++ b/debian/copyright @@ -0,0 +1,27 @@ +Copyright (C) 2009, Hyves (Startphone Ltd.) +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name of Hyves (Startphone Ltd.) nor the names of its + contributors may be used to endorse or promote products derived from this + software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/debian/docs b/debian/docs new file mode 100644 index 0000000..a6da7f1 --- /dev/null +++ b/debian/docs @@ -0,0 +1,3 @@ +README +TEST.TXT +doc/ diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..c2afa8c --- /dev/null +++ b/debian/rules @@ -0,0 +1,17 @@ +#!/usr/bin/make -f +# -*- makefile -*- +# Sample debian/rules that uses debhelper. +# This file was originally written by Joey Hess and Craig Small. +# As a special exception, when this file is copied by dh-make into a +# dh-make output file, you may use that output file without restriction. +# This special exception was added by Craig Small in version 0.37 of dh-make. + +# Uncomment this to turn on verbose mode. +#export DH_VERBOSE=1 + +export PYTHON=python2.6 +export DEB_BUILD_OPTIONS="nocheck" + +%: + #dh --without=python-support $@ + dh $@ diff --git a/lib/concurrence/__init__.py b/lib/concurrence/__init__.py index 23e7612..4627720 100644 --- a/lib/concurrence/__init__.py +++ b/lib/concurrence/__init__.py @@ -2,7 +2,7 @@ # # This module is part of the Concurrence Framework and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -__version__ = '0.3.1' #remember to update setup.py +__version__ = '0.3.2' #remember to update setup.py __version_info__ = tuple([ int(num) for num in __version__.split('.')]) from concurrence.core import dispatch, quit, disable_threading, get_version_info, TIMEOUT_NEVER, TIMEOUT_CURRENT @@ -16,10 +16,10 @@ try: import json -except: +except Exception: try: import simplejson as json - except: + except Exception: import logging logging.exception("could not import json library!', pls install simplejson or use python 2.6+") diff --git a/lib/concurrence/_unittest.py b/lib/concurrence/_unittest.py index 4c31813..a5840f4 100644 --- a/lib/concurrence/_unittest.py +++ b/lib/concurrence/_unittest.py @@ -56,14 +56,14 @@ def interceptor(addr): class TestCase(unittest.TestCase): - def setUp(self): - logging.debug("setUp %s, %s", self, '; '.join(['%s: %s' % (k, v) for k, v in get_version_info().items()])) + #def setUp(self): + # logging.debug("setUp %s, %s", self, '; '.join(['%s: %s' % (k, v) for k, v in get_version_info().items()])) def tearDown(self): try: Tasklet.yield_() #this make sure that everything gets a change to exit before we start the next test - logging.debug("tearDown %s, tasklet count #%d", self, Tasklet.count()) - except: + #logging.debug("tearDown %s, tasklet count #%d", self, Tasklet.count()) + except Exception: pass class _Timer: @@ -83,13 +83,15 @@ def sec(self, n): def timer(): return _Timer() -def main(timeout = None): +def main(timeout = None, ontimeout = None): logging.basicConfig() logging.root.setLevel(logging.DEBUG) if timeout is not None: def quit_test(): + if ontimeout: + ontimeout() logging.error("quiting unittest on timeout") quit(EXIT_CODE_TIMEOUT) logging.debug("test will timeout in %s seconds", timeout) diff --git a/lib/concurrence/application.py b/lib/concurrence/application.py index 44b5f7b..d395b56 100644 --- a/lib/concurrence/application.py +++ b/lib/concurrence/application.py @@ -66,7 +66,7 @@ def start(self): for _, resource, fa, member in self._find_members(r'__start(\d\d)__'): try: level = int(fa[0]) - except: + except Exception: level = -1 if level != -1: diff --git a/lib/concurrence/concurrence._event.pyx b/lib/concurrence/concurrence._event.pyx index 988ac4f..01cd290 100644 --- a/lib/concurrence/concurrence._event.pyx +++ b/lib/concurrence/concurrence._event.pyx @@ -137,6 +137,26 @@ cdef class __event: def delete(self): if event_del(&self.ev) == -1: raise EventError("could not delete event") + # Remove the trigger from list of already triggered (but not processed) events + global head + global tail + cdef __list* cur + cdef __list* prev + cur = head + prev = NULL + while cur != NULL: + if cur == &self.trig: + if cur == tail: + # Delete last node + tail = prev + if cur == head: + # Delete first node + head = head.next + else: + # Delete not first node + prev.next = cur.next + prev = cur + cur = cur.next def __dealloc__(self): self.delete() diff --git a/lib/concurrence/core.py b/lib/concurrence/core.py index 88e8b2c..0f6da93 100644 --- a/lib/concurrence/core.py +++ b/lib/concurrence/core.py @@ -288,6 +288,14 @@ def __exec__(self, f, *args, **kwargs): parent = self.parent() if parent: parent._remove_child(self) + children = self.children() + if children: + for child in children: + if parent: + child._set_parent(parent) + parent._add_child(child) + else: + child._set_parent(None) self._parent = None self._children = None self._join_channel = None @@ -316,7 +324,10 @@ def children(self): #TODO parent() and children() should be properties def _set_parent(self, parent): - self._parent = weakref.ref(parent) + if parent: + self._parent = weakref.ref(parent) + else: + self._parent = None def parent(self): """Gets the parent of this Tasklet. This method may return None if the parent is no longer there.""" @@ -413,9 +424,7 @@ def join_all(cls, tasks, timeout = TIMEOUT_CURRENT): results[t] = cls.join(t, deadline - time.time()) except JoinError, je: results[je.tasklet] = je - except TaskletExit: - raise - except: + except Exception: assert False, "expecting only join errors here" return [results[t] for t in tasks] @@ -436,9 +445,7 @@ def _loop(*args, **kwargs): while True: try: f(*args, **kwargs) - except TaskletExit: - break - except: + except Exception: logging.exception("unhandled exception in Tasklet.loop") cls.sleep(1.0) #prevent hogging the cpu if exception repeats @@ -458,9 +465,7 @@ def _interval(*args, **kwargs): cls.sleep(timeout) try: f(*args, **kwargs) - except TaskletExit: - break - except: + except Exception: logging.exception("unhandled exception in Tasklet.interval") return cls.new(_interval, **kwargs) @@ -492,9 +497,7 @@ def _interval(*args, **kwargs): if timeout < min_timeout: timeout = min_timeout try: f(current_time, *args, **kwargs) - except TaskletExit: - break - except: + except Exception: logging.exception("unhandled exception in Tasklet.rate") return cls.new(_interval, **kwargs) @@ -507,9 +510,7 @@ def _receiver(): for msg, args, kwargs in cls.receive(): try: f(msg, *args, **kwargs) - except TaskletExit: - break - except: + except Exception: logging.exception("unhandled exception in Tasklet.receiver") return cls.new(_receiver, **kwargs) @@ -553,13 +554,13 @@ def w(*args, **kwargs): t.__exec__(f, *args, **kwargs) t.bind(w) - if not daemon: - parent = cls.current() - #stackless main task is not an instance of our Tasklet class (but of stackless.tasklet) - #so we can only keep parent/child relation for Tasklet instances - if isinstance(parent, cls): - t._set_parent(parent) - parent._add_child(t) + + parent = cls.current() + #stackless main task is not an instance of our Tasklet class (but of stackless.tasklet) + #so we can only keep parent/child relation for Tasklet instances + if isinstance(parent, cls): + t._set_parent(parent) + parent._add_child(t) return t @@ -806,9 +807,7 @@ def interrupt(): try: while stackless.getruncount() > 1: stackless.schedule() - except TaskletExit: - pass - except: + except Exception: logging.exception("unhandled exception in dispatch schedule") #now block on IO till any IO is ready. @@ -818,9 +817,7 @@ def interrupt(): #so we call 'loop' which blocks until something available. try: event.loop() - except TaskletExit: - raise - except: + except Exception: logging.exception("unhandled exception in event loop") #we iterate over the available triggered events and @@ -831,9 +828,7 @@ def interrupt(): try: e, event_type, fd = event.next() e.data(event_type) - except TaskletExit: - raise - except: + except Exception: logging.exception("unhandled exception in event callback") finally: diff --git a/lib/concurrence/database/mysql/client.py b/lib/concurrence/database/mysql/client.py index 61ee512..6fcc18c 100644 --- a/lib/concurrence/database/mysql/client.py +++ b/lib/concurrence/database/mysql/client.py @@ -203,8 +203,10 @@ def _close(self): if self.current_resultset: self.current_resultset.close(True) self.socket.close() + self.socket = None self.state = self.STATE_CLOSED - except: + except Exception: + self.socket = None self.state = self.STATE_ERROR raise @@ -251,13 +253,13 @@ def connect(self, host = "localhost", port = 3306, user = "", passwd = "", db = except ClientLoginError: self.state = self.STATE_INIT raise - except: + except Exception: self.state = self.STATE_ERROR raise def close(self): """close this connection""" - assert self.is_connected(), "make sure connection is connected before closing" + assert self.socket, "make sure socket is opened when calling close" if self._incommand != False: assert False, "cannot close while still in a command" self._close() @@ -291,12 +293,20 @@ def command(self, cmd, cmd_text): else: #result set self.current_resultset = ResultSet(self, result) return self.current_resultset + except ClientCommandError: + raise + except Exception: + self.state = self.STATE_ERROR + raise finally: self._incommand = False def is_connected(self): return self.state == self.STATE_CONNECTED + def is_connecting(self): + return self.state == self.STATE_CONNECTING + def query(self, cmd_text): """Sends a COM_QUERY command with the given text and return a resultset (select)""" return self.command(COMMAND.QUERY, cmd_text) diff --git a/lib/concurrence/database/mysql/concurrence.database.mysql._mysql.pyx b/lib/concurrence/database/mysql/concurrence.database.mysql._mysql.pyx index 1cbf780..cb1c232 100644 --- a/lib/concurrence/database/mysql/concurrence.database.mysql._mysql.pyx +++ b/lib/concurrence/database/mysql/concurrence.database.mysql._mysql.pyx @@ -223,14 +223,14 @@ cdef class PacketReader: #64 bit word if packet._position + 9 > packet._limit: raise BufferUnderflowError() vw = 0 - vw |= (packet._buff[packet._position + 1]) << 0 - vw |= (packet._buff[packet._position + 2]) << 8 - vw |= (packet._buff[packet._position + 3]) << 16 - vw |= (packet._buff[packet._position + 4]) << 24 - vw |= (packet._buff[packet._position + 5]) << 32 - vw |= (packet._buff[packet._position + 6]) << 40 - vw |= (packet._buff[packet._position + 7]) << 48 - vw |= (packet._buff[packet._position + 8]) << 56 + vw = vw | (packet._buff[packet._position + 1]) << 0 + vw = vw | (packet._buff[packet._position + 2]) << 8 + vw = vw | (packet._buff[packet._position + 3]) << 16 + vw = vw | (packet._buff[packet._position + 4]) << 24 + vw = vw | (packet._buff[packet._position + 5]) << 32 + vw = vw | (packet._buff[packet._position + 6]) << 40 + vw = vw | (packet._buff[packet._position + 7]) << 48 + vw = vw | (packet._buff[packet._position + 8]) << 56 packet._position = packet._position + 9 return vw @@ -253,6 +253,10 @@ cdef class PacketReader: if packet._position + 2 > packet._limit: raise BufferUnderflowError() n = packet._buff[packet._position + 1] | ((packet._buff[packet._position + 2]) << 8) w = 3 + elif n == 253: + if packet._position + 4 > packet._limit: raise BufferUnderflowError() + n = packet._buff[packet._position + 1] | ((packet._buff[packet._position + 2]) << 8) | ((packet._buff[packet._position + 3]) << 16) + w = 4 else: assert False, 'not implemented yet, n: %02x' % n diff --git a/lib/concurrence/database/mysql/dbapi.py b/lib/concurrence/database/mysql/dbapi.py index 6c86847..48913f9 100644 --- a/lib/concurrence/database/mysql/dbapi.py +++ b/lib/concurrence/database/mysql/dbapi.py @@ -89,7 +89,6 @@ def _escape_string(self, s): return ''.join(escaped) def _wrap_exception(self, e, msg): - self.log.exception(msg) if isinstance(e, ConcurrenceTimeoutError): return TimeoutError(msg + ': ' + str(e)) else: @@ -174,6 +173,7 @@ def close(self): raise self._wrap_exception(e, "an error occurred while closing cursor") class Connection(object): + log = logging.getLogger('Connection') def __init__(self, *args, **kwargs): @@ -215,7 +215,6 @@ def close(self): raise except Exception, e: msg = "an error occurred while closing connection: " - self.log.exception(msg) raise Error(msg + str(e)) def cursor(self): diff --git a/lib/concurrence/database/pool.py b/lib/concurrence/database/pool.py index 7752d18..1937585 100644 --- a/lib/concurrence/database/pool.py +++ b/lib/concurrence/database/pool.py @@ -160,11 +160,9 @@ def connect(self): #none available, but still allowed to create new connection try: return (True, self._new()) - except TaskletExit: - raise #server exiting except TimeoutError: raise - except: + except Exception: self.log.exception("%s: could not create new connection for pool", self) #we will continue from here waiting for idle connection diff --git a/lib/concurrence/dns.py b/lib/concurrence/dns.py new file mode 100644 index 0000000..e16e0d1 --- /dev/null +++ b/lib/concurrence/dns.py @@ -0,0 +1,49 @@ +from stackless import * +import adns +import sys + +class QueryEngine(object): + """ + Usage: + create 1: engine = QueryEngine() + create 2: engine = QueryEngine(configtext="nameserver 8.8.8.8") + sync query: result = engine.synchronous("www.google.com", adns.rr.ADDR) + run async engine: engine.run() + async query: result = engine.asynchronous("www.google.com", adns.rr.ADDR) + """ + def __init__(self, s=None, configtext=None): + if s is None: + flags = adns.iflags.noautosys | adns.iflags.noserverwarn | adns.iflags.noerrprint + if configtext is None: + s = adns.init(flags) + else: + s = adns.init(flags, sys.stderr, configtext) + self._s = s + self._queries = {} + self._running = False + + def synchronous(self, qname, rr, flags=0): + return self._s.synchronous(qname, rr, flags) + + def asynchronous(self, qname, rr, flags=0): + response_channel = stackless.channel() + q = self._s.submit(qname, rr, flags) + self._queries[q] = response_channel + if not self._running: + self._running = True + stackless.tasklet(self._run)() + return response_channel.receive() + + def _run(self): + while True: + # stop if no pending queries + if not len(self._queries): + break + # check completeness + for q in self._s.completed(0): + answer = q.check() + response_channel = self._queries[q] + del self._queries[q] + response_channel.send(answer) + stackless.schedule() + self._running = False diff --git a/lib/concurrence/extra.py b/lib/concurrence/extra.py index 60ea28e..c655ab0 100644 --- a/lib/concurrence/extra.py +++ b/lib/concurrence/extra.py @@ -77,9 +77,7 @@ def run(self): try: f, args, kwargs = self._queue.popleft(True, TIMEOUT_NEVER) f(*args, **kwargs) - except TaskletExit: - raise - except: + except Exception: self.log.exception("in taskpool worker") Tasklet.sleep(1.0) @@ -102,9 +100,7 @@ def _pump(self): try: f, args, kwargs = self._queue.popleft() f(*args, **kwargs) - except TaskletExit: - raise - except: + except Exception: self.log.exception("in deferred queue") finally: self._working = False diff --git a/lib/concurrence/http/client.py b/lib/concurrence/http/client.py index 7f52bef..b7cede4 100644 --- a/lib/concurrence/http/client.py +++ b/lib/concurrence/http/client.py @@ -10,7 +10,7 @@ import time import logging -from concurrence import Tasklet, Channel, Message, __version__ +from concurrence import Tasklet, Channel, Message, __version__, TimeoutError from concurrence.timer import Timeout from concurrence.io import Connector, BufferedStream from concurrence.http import HTTPError, HTTPRequest, HTTPResponse @@ -41,16 +41,22 @@ class HTTPConnection(object): log = logging.getLogger('HTTPConnection') + def __init__(self): + self.limit = None + def connect(self, endpoint): """Connect to the webserver at *endpoint*. *endpoint* is a tuple (, ).""" self._host = None if type(endpoint) == type(()): try: self._host = endpoint[0] - except: + except Exception: pass self._stream = BufferedStream(Connector.connect(endpoint), read_buffer_size = 1024 * 8, write_buffer_size = 1024 * 4) + def set_limit(self, limit): + self.limit = limit + def receive(self): """Receive the next :class:`HTTPResponse` from the connection.""" try: @@ -59,6 +65,10 @@ def receive(self): raise except EOFError: raise HTTPError("EOF while reading response") + except HTTPError: + raise + except TimeoutError: + raise except Exception: self.log.exception('') raise HTTPError("Exception while reading response") @@ -80,36 +90,49 @@ def _receive(self): key, value = line.split(': ') response.add_header(key, value) - #read data - transfer_encoding = response.get_header('Transfer-Encoding', None) - - try: - content_length = int(response.get_header('Content-Length')) - except: - content_length = None - - #TODO better support large data, e.g. iterator instead of append all data to chunks chunks = [] - if transfer_encoding == 'chunked': - while True: - chunk_line = reader.read_line() - chunk_size = int(chunk_line.split(';')[0], 16) - if chunk_size > 0: - data = reader.read_bytes(chunk_size) - reader.read_line() #chunk is always followed by a single empty line + if response.status_code != 204: + #read data + transfer_encoding = response.get_header('Transfer-Encoding', None) + + try: + content_length = int(response.get_header('Content-Length')) + if self.limit is not None and content_length > self.limit: + raise HTTPError("Response is too long") + except Exception: + content_length = None + + #TODO better support large data, e.g. iterator instead of append all data to chunks + + if transfer_encoding == 'chunked': + while True: + chunk_line = reader.read_line() + chunk_size = int(chunk_line.split(';')[0], 16) + if chunk_size > 0: + data = reader.read_bytes(chunk_size) + reader.read_line() #chunk is always followed by a single empty line + chunks.append(data) + else: + reader.read_line() #chunk is always followed by a single empty line + break + elif content_length is not None: + while content_length > 0: + n = min(CHUNK_SIZE, content_length) + data = reader.read_bytes(n) + chunks.append(data) + content_length -= len(data) + else: + content_length = 0 + while True: + try: + data = reader.read_bytes_available() + except EOFError: + break chunks.append(data) - else: - reader.read_line() #chunk is always followed by a single empty line - break - elif content_length is not None: - while content_length > 0: - n = min(CHUNK_SIZE, content_length) - data = reader.read_bytes(n) - chunks.append(data) - content_length -= len(data) - else: - assert False, 'TODO' + content_length += len(data) + if self.limit is not None and content_length > self.limit: + raise HTTPError("Response is too long") response.iter = chunks diff --git a/lib/concurrence/http/concurrence.http._http.pyx b/lib/concurrence/http/concurrence.http._http.pyx index 35d3f7d..6f1c82e 100644 --- a/lib/concurrence/http/concurrence.http._http.pyx +++ b/lib/concurrence/http/concurrence.http._http.pyx @@ -106,7 +106,7 @@ cdef class HTTPParser: cdef _cb_field(self, name, value): key = 'HTTP_' + name if key in self.environ: - self.environ[key] += ',' + value # comma-separate multiple headers + self.environ[key] = self.environ[key] + ',' + value # comma-separate multiple headers else: self.environ[key] = value @@ -123,7 +123,7 @@ cdef class HTTPParser: if http_parser_has_error(self._parser): raise HTTPParserError("parse error") else: - self._buffer._position += self._buffer._position + nread + self._buffer._position = self._buffer._position + nread if http_parser_is_finished(self._parser): return True else: diff --git a/lib/concurrence/http/server.py b/lib/concurrence/http/server.py index d69eff3..40b67f4 100644 --- a/lib/concurrence/http/server.py +++ b/lib/concurrence/http/server.py @@ -12,6 +12,7 @@ import httplib import traceback import rfc822 +import re from concurrence import Tasklet, Message, Channel, TimeoutError, __version__ from concurrence.io import Server, BufferedStream @@ -25,7 +26,6 @@ HTTP_READ_TIMEOUT = 300 #default read timeout, if no request was read within this time, the connection is closed by server - class WSGIInputStream(object): def __init__(self, request, reader): transfer_encoding = request.get_request_header('Transfer-Encoding') @@ -41,6 +41,7 @@ def __init__(self, request, reader): self._n = int(content_length) self._file = reader.file() self._channel = Channel() + self.readline_buffer = None def _read_request_data(self): if self._n is not None: @@ -59,8 +60,27 @@ def read(self, n): else: return '' #EOF - def readline(self): - assert False, 'TODO' + def readline(self, maxlen=-1): + if self.readline_buffer is None: + self.readline_buffer = '' + self.re_line = re.compile(r'^(.*?(?:\r\n|\n))(.*)', re.DOTALL) + while True: + m = self.re_line.match(self.readline_buffer) + if m: + line, self.readline_buffer = m.group(1, 2) + return line + elif self._n <= 0: + if len(self.readline_buffer): + line = self.readline_buffer + self.readline_buffer = '' + return line + return None + data = self._file.read(min(self._n, 16384)) + if len(data) == 0: + self._n = 0 + else: + self._n -= len(data) + self.readline_buffer = self.readline_buffer + data def readlines(self): assert False, 'TODO' @@ -135,36 +155,40 @@ def write_response(self, response, writer): writer.write_bytes("Date: %s\r\n" % rfc822.formatdate()) writer.write_bytes("Server: %s\r\n" % SERVER_ID) - if chunked: + if chunked and self.method != 'HEAD': writer.write_bytes("Transfer-Encoding: chunked\r\n") else: - response = ''.join(response) - writer.write_bytes("Content-length: %d\r\n" % len(response)) + l = 0 + for chunk in response: + l += len(chunk) + writer.write_bytes("Content-length: %d\r\n" % l) writer.write_bytes("\r\n") - self.state = self.STATE_WRITING_DATA + if self.method != 'HEAD': + self.state = self.STATE_WRITING_DATA - if chunked: - for chunk in response: - writer.write_bytes("%x;\r\n" % len(chunk)) - writer.write_bytes(chunk) - writer.write_bytes("\r\n") + if chunked: + for chunk in response: + writer.write_bytes("%x;\r\n" % len(chunk)) + writer.write_bytes(chunk) + writer.write_bytes("\r\n") - writer.write_bytes("0\r\n\r\n") - else: - writer.write_bytes(response) + writer.write_bytes("0\r\n\r\n") + else: + for chunk in response: + writer.write_bytes(chunk if type(chunk) == str else str(chunk)) writer.flush() #TODO use special header to indicate no flush needed self.state = self.STATE_FINISHED def handle_request(self, application): + if self.method not in ['GET', 'POST', 'HEAD']: + return self._server.not_implemented(self.environ, self.start_response) try: return application(self.environ, self.start_response) - except TaskletExit: - raise - except: + except Exception: self.log.exception("unhandled exception while handling request") return self._server.internal_server_error(self.environ, self.start_response) @@ -190,9 +214,6 @@ def _read_request(self, reader): u = urlparse.urlparse(line[1]) self.method = line[0] - if self.method not in ['GET', 'POST']: - raise HTTPError('Unsupported method: %s' % self.method) - #TODO validate version self.version = line[2] self.uri = line[1] @@ -330,6 +351,7 @@ def handle(self, socket, application): Tasklet.new(self.handle_request, name = 'request_handler')(control, request, application) elif msg.match(self.MSG_REQUEST_HANDLED): + #request.environ["wsgi.input"].read(request.environ["wsgi.input"]._n) #we use reque to retire (send out) the responses in the correct order for request, response in self._reque.finish(request, response): self.MSG_WRITE_RESPONSE.send(response_writer)(request, response) @@ -386,6 +408,10 @@ def internal_server_error(self, environ, start_response): start_response('500 Internal Server Error', [('Content-type', 'text/plain')]) return [traceback.format_exc(20)] + def not_implemented(self, environ, start_response): + start_response('501 Not Implemented', [('Content-type', 'text/html')]) + return ["501 Not Implemented"] + def handle_request(self, request, application): """All HTTP requests pass trough this method. This method provides a hook for logging, statistics and or further processing w.r.t. the *request*.""" diff --git a/lib/concurrence/http/server2.py b/lib/concurrence/http/server2.py index eefe22d..f4daadb 100644 --- a/lib/concurrence/http/server2.py +++ b/lib/concurrence/http/server2.py @@ -200,9 +200,7 @@ def handle_request(self, request): try: response = self._application(request.environ, request.start_response) self.log.log(self._request_log_level, "%s %s", request.response_status, request.uri) - except TaskletExit: - raise - except: + except Exception: self.log.exception("unhandled exception while handling request") response = self.internal_server_error(request.environ, request.start_response) return response diff --git a/lib/concurrence/io/buffered.py b/lib/concurrence/io/buffered.py index 2f30688..ef7912b 100644 --- a/lib/concurrence/io/buffered.py +++ b/lib/concurrence/io/buffered.py @@ -76,7 +76,8 @@ def read_bytes(self, n): while n > 0: r = buffer.remaining if r > 0: - s.append(buffer.read_bytes(min(n, r))) + r = min(n, r) + s.append(buffer.read_bytes(r)) n -= r else: self.fill() @@ -272,7 +273,7 @@ def readlines(self): buffer.flip() yield buffer.read_bytes(-1) - def readline(self): + def readline(self, n = -1): return self.readlines().next() def read(self, n = -1): @@ -291,7 +292,8 @@ def read(self, n = -1): while n > 0: #read uptill n avaiable bytes or EOF r = buffer.remaining if r > 0: - s.append(buffer.read_bytes(min(n, r))) + r = min(n, r) + s.append(buffer.read_bytes(r)) n -= r else: try: diff --git a/lib/concurrence/io/concurrence.io._io.pyx b/lib/concurrence/io/concurrence.io._io.pyx index f7f5d39..e051aaf 100644 --- a/lib/concurrence/io/concurrence.io._io.pyx +++ b/lib/concurrence/io/concurrence.io._io.pyx @@ -421,7 +421,7 @@ cdef class Buffer: s2.append(c) else: s2.append('.') - x += 1 + x = x + 1 if x % 16 == 0: out.write('%04x' % (x - 16) + ' ' + ' '.join(s1[:8]) + ' ' + ' '.join(s1[8:]) + ' ' + ''.join(s2[:8]) + ' ' + (''.join(s2[8:]) + '\n')) s1 = [] diff --git a/lib/concurrence/io/socket.py b/lib/concurrence/io/socket.py index d03b7dc..6a73361 100644 --- a/lib/concurrence/io/socket.py +++ b/lib/concurrence/io/socket.py @@ -45,7 +45,7 @@ def __init__(self, socket, state = STATE_INIT): #sending try: self.socket.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1) - except: + except Exception: self.log.warn("could not set TCP_NODELAY") #concurrence sockets are always non-blocking, this is the whole idea :-) : @@ -168,7 +168,7 @@ def _connect(self, addr, timeout = TIMEOUT_CURRENT): try: err = self.socket.connect_ex(addr) serr = self.socket.getsockopt(_socket.SOL_SOCKET, _socket.SO_ERROR) - except: + except Exception: self.log.exception("unexpected exception thrown by connect_ex") raise if err == 0 and serr == 0: @@ -180,7 +180,7 @@ def _connect(self, addr, timeout = TIMEOUT_CURRENT): try: self.writable.wait(timeout = timeout) self.state = self.STATE_CONNECTED - except: + except Exception: self.state = self.STATE_INIT raise else: @@ -299,17 +299,13 @@ def _handle_accept(self, accepted_socket): result = None try: result = self._handler(accepted_socket) - except TaskletExit: - raise - except: + except Exception: self.log.exception("unhandled exception in socket handler") finally: if result is None and not accepted_socket.is_closed(): try: accepted_socket.close() - except TaskletExit: - raise - except: + except Exception: self.log.exception("unhandled exception while forcefully closing client") def _create_socket(self): diff --git a/lib/concurrence/memcache/client.py b/lib/concurrence/memcache/client.py index 2c33456..db2ada5 100644 --- a/lib/concurrence/memcache/client.py +++ b/lib/concurrence/memcache/client.py @@ -137,41 +137,47 @@ def is_connected(self): return self._stream is not None def _defer_commands(self, cmds, result_channel): + def _exception(msg): + self.log.error(msg) + for _, _, error_value in cmds: + result_channel.send((MemcacheResult.ERROR, error_value)) + self.close() def _read_results(): protocol = self._protocol + if not self.is_connected(): + return _exception("lost connection in defer_commands") with self._stream.get_reader() as reader: + failed = False for cmd, args, error_value in cmds: try: result = protocol.read(cmd, reader) result_channel.send(result) - except TaskletExit: - raise - except: - self.log.exception("read error in defer_commands") + except Exception: + self.log.error("read error in defer_commands") result_channel.send((MemcacheResult.ERROR, error_value)) + failed = True + if failed: + self.close() + return #end _read_commands def _write_commands(): protocol = self._protocol try: if not self.is_connected(): self.connect() - except TaskletExit: - raise - except: - self.log.exception("connect error in defer_commands") - for _, _, error_value in cmds: - result_channel.send((MemcacheResult.ERROR, error_value)) - return + except Exception: + return _exception("connect error in defer_commands") with self._stream.get_writer() as writer: + failed = False for cmd, args, error_value in cmds: try: protocol.write(cmd, writer, args) - except TaskletExit: - raise - except: - self.log.exception("write error in defer_commands") - result_channel.send((MemcacheResult.ERROR, error_value)) - writer.flush() + except Exception: + return _exception("write error in defer_commands") + try: + writer.flush() + except Exception: + return _exception("write flush error in defer_commands") self._read_queue.defer(_read_results) #end _write_commands self._write_queue.defer(_write_commands) diff --git a/lib/concurrence/smtp.py b/lib/concurrence/smtp.py new file mode 100644 index 0000000..295a7ba --- /dev/null +++ b/lib/concurrence/smtp.py @@ -0,0 +1,35 @@ +import smtplib +from smtplib import * +from concurrence.io import Socket +from concurrence.io.buffered import Buffer, BufferedReader, BufferedWriter + +# Use class concurrence.smtp.SMTP exactly like smtplib.SMTP + +class SMTP(smtplib.SMTP): + """ This is a subclass derived from SMTP that connects over a Concurrence socket """ + def _get_socket(self, host, port, timeout): + new_socket = Socket.connect((host, port)) + self._reader = BufferedReader(new_socket, Buffer(1024)) + self._writer = BufferedWriter(new_socket, Buffer(1024)) + self.file = self._reader.file() + return new_socket + + def send(self, str): + if self.debuglevel > 0: print>>sys.stderr, 'send:', repr(str) + if hasattr(self, 'sock') and self.sock: + try: + self._writer.write_bytes(str) + self._writer.flush() + except IOError: + self.close() + raise SMTPServerDisconnected('Server not connected') + else: + raise SMTPServerDisconnected('please run connect() first') + + def close(self): + if self.sock: + self.sock.close() + self.sock = None + self._reader = None + self._writer = None + self.file = None diff --git a/lib/concurrence/thr.py b/lib/concurrence/thr.py new file mode 100644 index 0000000..0b83b3b --- /dev/null +++ b/lib/concurrence/thr.py @@ -0,0 +1,63 @@ +import concurrence.io +from thrift.transport.TTransport import TTransportBase, TTransportException +import socket + +# Example: +# from concurrence.thr import Socket +# sock = Socket((("localhost", 9160),)) +# trans = TTransport.TFramedTransport(sock) +# proto = TBinaryProtocol.TBinaryProtocolAccelerated(trans) +# client = Client(proto) +# +# Note: you must use Framed Transport to use thrift with nonblocking sockets + +class Socket(TTransportBase): + "Thrift Socket implemetation on top of the Concurrence" + + def __init__(self, hosts): + self.hosts = hosts + self.handle = None + self.timeout = -1 + + def setTimeout(self, ms): + if ms is None: + self.timeout = -1 + else: + self.timeout = ms / 1000.0 + + def open(self): + try: + for host in self.hosts: + try: + self.handle = concurrence.io.Socket.connect(host, self.timeout) + self.stream = concurrence.io.BufferedStream(self.handle) + except socket.error as e: + if host is not self.hosts[-1]: + continue + else: + raise e + break + except socket.error as e: + message = "Could not connect to thrift socket" + raise TTransportException(TTransportException.NOT_OPEN, message) + + def close(self): + if self.handle: + self.handle.close() + self.handle = None + self.stream = None + + def isOpen(self): + return self.handle is not None + + def write(self, buff): + self.stream.writer.write_bytes(buff) + + def flush(self): + self.stream.writer.flush() + + def read(self, sz): + buff = self.stream.reader.read_bytes(sz) + if len(buff) != sz: + raise TTransportException("Thrift socket read %d bytes instead of %d" % (len(buff), sz)) + return buff diff --git a/sandbox/test_mysql.py b/sandbox/test_mysql.py new file mode 100644 index 0000000..bcda8dc --- /dev/null +++ b/sandbox/test_mysql.py @@ -0,0 +1,37 @@ +from concurrence import dispatch, Tasklet +from concurrence.database.mysql import client, dbapi, PacketReadError +import logging +import traceback + +modlogger = logging.getLogger("") +modlogger.setLevel(logging.DEBUG) +stderr_channel = logging.StreamHandler() +stderr_channel.setLevel(logging.DEBUG) +modlogger.addHandler(stderr_channel) + +DB_HOST = 'localhost' +DB_USER = 'concurrence_test' +DB_PASSWD = 'concurrence_test' +DB_DB = 'concurrence_test' + +def main(): + i = 0 + cnn = None + while True: + try: + print "iteration" + if not cnn or not cnn.is_connected(): + cnn = client.connect(host = DB_HOST, user = DB_USER, passwd = DB_PASSWD, db = DB_DB) + res = cnn.query("select %d" % i) + print list(res)[0][0] + res.close() + res = cnn.query("select sleep(1)") + list(res)[0][0] + res.close() + except Exception as e: + traceback.print_exc() + else: + i += 1 + +if __name__ == '__main__': + dispatch(main) diff --git a/setup.py b/setup.py index 1fa23aa..0aac3ab 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ libevent_include_dirs = ['%s/include' % prefix] libevent_library_dirs = ['%s/lib' % prefix] -VERSION = '0.3.1' #must be same as concurrence.__init__.py.__version__ +VERSION = '0.3.2' #must be same as concurrence.__init__.py.__version__ setup( name = "concurrence", diff --git a/test/concurrence_test.sql b/test/concurrence_test.sql index cab9b43..e087ce5 100644 --- a/test/concurrence_test.sql +++ b/test/concurrence_test.sql @@ -1,21 +1,25 @@ use concurrence_test; +DROP TABLE IF EXISTS `tbltest`; +DROP TABLE IF EXISTS `tblautoincint`; +DROP TABLE IF EXISTS `tblautoincbigint`; + CREATE TABLE `tbltest` ( `test_id` int(11) NOT NULL, `test_string` varchar(1024) NOT NULL, `test_blob` longblob NOT NULL -) ENGINE=MyISAM DEFAULT CHARSET=latin1; +) ENGINE=innodb DEFAULT CHARSET=latin1; CREATE TABLE `tblautoincint` ( `test_id` INT UNSIGNED NOT NULL AUTO_INCREMENT, `test_string` varchar(1024) NOT NULL, PRIMARY KEY(test_id) -) ENGINE=MyISAM DEFAULT CHARSET=latin1; +) ENGINE=innodb DEFAULT CHARSET=latin1; CREATE TABLE `tblautoincbigint` ( `test_id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, `test_string` varchar(1024) NOT NULL, PRIMARY KEY(test_id) -) ENGINE=MyISAM DEFAULT CHARSET=latin1; +) ENGINE=innodb DEFAULT CHARSET=latin1; GRANT ALL on concurrence_test.* to 'concurrence_test'@'localhost' identified by 'concurrence_test'; diff --git a/test/testcore.py b/test/testcore.py index 6fcfdb6..07c35d3 100644 --- a/test/testcore.py +++ b/test/testcore.py @@ -172,7 +172,7 @@ def ival(): except TimeoutError: #expect 2 counts, because interval started after 1 second self.assertEquals(2, len(count)) - except: + except Exception: self.fail('expected timeout, got %s' % sys.exc_type) finally: ival_task.kill() @@ -186,7 +186,7 @@ def ival(): except TimeoutError: #expect 3 counts, because interval started immediately self.assertEquals(3, len(count)) - except: + except Exception: self.fail('expected timeout') finally: ival_task.kill() diff --git a/test/testmemcache.py b/test/testmemcache.py index 03e69a9..7c4a761 100644 --- a/test/testmemcache.py +++ b/test/testmemcache.py @@ -27,24 +27,39 @@ class TestMemcache(unittest.TestCase): log = logging.getLogger("TestMemcache") - def setUp(self): + def memcachedRun(self): self.log.debug("using memcached daemon: %s", MEMCACHED_BIN) for i in range(4): - cmd = '%s -m 10 -p %d -u nobody -l 127.0.0.1&' % (MEMCACHED_BIN, 11211 + i) + cmd = '%s -m 10 -p %d -u nobody -l 127.0.0.1&' % (MEMCACHED_BIN, 11212 + i) self.log.debug(cmd) os.system(cmd) Tasklet.sleep(1.0) #should be enough for memcached to get up and running - def tearDown(self): - MemcacheConnectionManager.create("default").close_all() + def memcachedKill(self): + cmd = 'killall -KILL %s' % MEMCACHED_BIN + self.log.debug(cmd) + os.system(cmd) + Tasklet.sleep(1.0) #should be enough for memcached to go down - cmd = 'killall %s' % MEMCACHED_BIN + def memcachedPause(self): + cmd = 'killall -STOP %s' % MEMCACHED_BIN self.log.debug(cmd) os.system(cmd) + Tasklet.sleep(1.0) #should be enough for memcached to pause - Tasklet.sleep(1.0) #should be enough for memcached to go down + def memcachedResume(self): + cmd = 'killall -CONT %s' % MEMCACHED_BIN + self.log.debug(cmd) + os.system(cmd) + + def setUp(self): + self.memcachedRun() + + def tearDown(self): + MemcacheConnectionManager.create("default").close_all() + self.memcachedKill() def testResultCode(self): @@ -221,11 +236,11 @@ def sharedTestBasic(self, mc): def testBasicSingle(self): - mc = MemcacheConnection((MEMCACHE_IP, 11211)) + mc = MemcacheConnection((MEMCACHE_IP, 11212)) self.sharedTestBasic(mc) def testSingleBatch(self): - mc = MemcacheConnection((MEMCACHE_IP, 11211)) + mc = MemcacheConnection((MEMCACHE_IP, 11212)) #batch = mc.batch() #batch.set('bset_1', 10) #batch.set('bset_2', 20) @@ -251,7 +266,7 @@ def testSingleBatch(self): def testExtraSingle(self): """test stuff that only makes sense on a single server connection""" - mc = MemcacheConnection((MEMCACHE_IP, 11211)) + mc = MemcacheConnection((MEMCACHE_IP, 11212)) res1, v1 = mc.version() res2, v2 = mc.version() self.assertEquals(MemcacheResult.OK, res1) @@ -264,14 +279,14 @@ def testExtraSingle(self): def testBasic(self): mc = Memcache() - mc.set_servers([((MEMCACHE_IP, 11211), 100)]) + mc.set_servers([((MEMCACHE_IP, 11212), 100)]) self.sharedTestBasic(mc) def testMemcache(self): mc = Memcache() - mc.set_servers([((MEMCACHE_IP, 11211), 100)]) + mc.set_servers([((MEMCACHE_IP, 11212), 100)]) N = 10000 @@ -283,30 +298,58 @@ def testMemcache(self): def testTimeout(self): mc = Memcache() - mc.set_servers([((MEMCACHE_IP, 11211), 100)]) + mc.set_servers([((MEMCACHE_IP, 11212), 100)]) - def callback(socket, count, event, args, kwargs): - print count, event, Tasklet.current() - if (count, event) == (1, "write"): - pass - elif (count, event) == (2, "read"): - Tasklet.sleep(1.0) - return "OK\r\n" + self.memcachedPause() - unittest.TestSocket.install((MEMCACHE_IP, 11211), callback) + def clientTimeout(): + with Timeout.push(0.5): + self.assertEquals(MemcacheResult.TIMEOUT, mc.set('blaat', 'aap')) + def clientError(): + with Timeout.push(0.5): + self.assertEquals(MemcacheResult.ERROR, mc.set('blaat', 'aap')) + + # Send some requests + for i in xrange(0, 1000): + Tasklet.new(clientTimeout)() + Tasklet.join_children() with Timeout.push(0.5): - self.assertEquals(MemcacheResult.TIMEOUT, mc.set('blaat', 'aap')) - print 'done (timeout)' + self.assertEquals(MemcacheResult.TIMEOUT, mc.set('foo', 'bar')) + print 'done (timeout)' + + self.memcachedResume() + + self.assertEquals(mc.get('blaat'), 'aap') + self.assertEquals(mc.get('foo'), 'bar') + + self.memcachedPause() + + # Send some requests + for i in xrange(0, 1000): + Tasklet.new(clientTimeout)() + Tasklet.join_children() + + self.memcachedKill() + + # Send some requests expected + for i in xrange(0, 1000): + Tasklet.new(clientError)() + Tasklet.join_children() + + self.memcachedRun() - Tasklet.sleep(4.0) + self.assertEquals(MemcacheResult.STORED, mc.set("bar", "baz")) + self.assertEquals(None, mc.get("blaat")) + self.assertEquals(None, mc.get("foo")) + self.assertEquals("baz", mc.get("bar")) def testMemcacheMultiServer(self): mc = Memcache() - mc.set_servers([((MEMCACHE_IP, 11211), 100), - ((MEMCACHE_IP, 11212), 100), + mc.set_servers([((MEMCACHE_IP, 11212), 100), ((MEMCACHE_IP, 11213), 100), - ((MEMCACHE_IP, 11214), 100)]) + ((MEMCACHE_IP, 11214), 100), + ((MEMCACHE_IP, 11215), 100)]) N = 10000 keys = ['test%d' % i for i in range(N)] @@ -336,10 +379,10 @@ def testMultiClientMultiServer(self): keys = ['test%d' % i for i in range(N)] mc = Memcache() - mc.set_servers([((MEMCACHE_IP, 11211), 100), - ((MEMCACHE_IP, 11212), 100), + mc.set_servers([((MEMCACHE_IP, 11212), 100), ((MEMCACHE_IP, 11213), 100), - ((MEMCACHE_IP, 11214), 100)]) + ((MEMCACHE_IP, 11214), 100), + ((MEMCACHE_IP, 11215), 100)]) with unittest.timer() as tmr: for i in range(N): @@ -364,7 +407,7 @@ def testTextProtocol(self): from concurrence.io import Socket, BufferedStream from concurrence.memcache.protocol import MemcacheProtocol - socket = Socket.connect((MEMCACHE_IP, 11211)) + socket = Socket.connect((MEMCACHE_IP, 11212)) stream = BufferedStream(socket) writer = stream.writer reader = stream.reader @@ -448,7 +491,7 @@ def testConnectionManager(self): connections = [] def connector(): - connections.append(cm.get_connection((MEMCACHE_IP, 11211), protocol)) + connections.append(cm.get_connection((MEMCACHE_IP, 11212), protocol)) Tasklet.new(connector)() Tasklet.new(connector)() diff --git a/test/testmysql.py b/test/testmysql.py index 5875a88..41e2049 100644 --- a/test/testmysql.py +++ b/test/testmysql.py @@ -456,6 +456,35 @@ def create_reader(bytes): self.assertEquals(9, p.packet.limit) self.assertEquals(9, p.packet.position) + def testDeadlocks(self): + def process(cnn, cur, val): + try: + cur.execute("begin") + cur.execute("insert into tbltest (test_id) values (1)") + cur.execute("select sleep(2)") + cur.execute("update tbltest set test_id=%d" % val) + cur.execute("select sleep(2)") + cur.execute("commit") + return False + except dbapi.Error as e: + return "deadlock" in str(e).lower() + cnn1 = dbapi.connect(host = DB_HOST, user = DB_USER, passwd = DB_PASSWD, db = DB_DB) + cur1 = cnn1.cursor() + cnn2 = dbapi.connect(host = DB_HOST, user = DB_USER, passwd = DB_PASSWD, db = DB_DB) + cur2 = cnn2.cursor() + t1 = Tasklet.new(process)(cnn1, cur1, 2) + t2 = Tasklet.new(process)(cnn2, cur2, 3) + res = Tasklet.join_all([t1, t2]) + self.assertTrue(res[0] or res[1], + 'At least one of the queries expected to fail due to deadlock (innodb must be used)') + # Both connections must survive after error + cur1.execute("select 1") + cur2.execute("select 2") + cur1.close() + cnn1.close() + cur2.close() + cnn2.close() + if __name__ == '__main__': unittest.main(timeout = 60) diff --git a/test/testtest.py b/test/testtest.py index 5f2260d..04dc7bc 100644 --- a/test/testtest.py +++ b/test/testtest.py @@ -1,4 +1,5 @@ from concurrence import unittest, Tasklet +import os class TestTest(unittest.TestCase): def testTimeout(self): @@ -7,6 +8,9 @@ def testTimeout(self): self.fail('expected timeout!') except TaskletExit: pass #caused by timeout - + +def ontimeout(): + os._exit(0) + if __name__ == '__main__': - unittest.main(timeout = 2) + unittest.main(timeout = 2, ontimeout = ontimeout)