From 46152f44ec598e31b77f0a50f0a009121c42bb44 Mon Sep 17 00:00:00 2001 From: Eric Du Date: Tue, 28 Jun 2011 19:50:21 +0800 Subject: [PATCH 1/2] updated .gitignore for PyCharm --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ee342e9..bfac18f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,7 @@ build/ .coverage *.egg-info logs/ -dist/ \ No newline at end of file +dist/ + +# PyCharm +/.idea From 36aba5dbdb5b18c413b8570a998f0b9308d4b373 Mon Sep 17 00:00:00 2001 From: Eric Du Date: Wed, 29 Jun 2011 01:16:58 +0800 Subject: [PATCH 2/2] move pyres datetimes from naive local to naive UTC: stick with timestamp for scheduling; work with naive UTC datetimes internally; convert datetimes to naive UTC right after received from external(ResQ.enqueue_at). --- pyres/__init__.py | 26 +++++++----- pyres/extensions.py | 6 +-- pyres/failure/redis.py | 4 +- pyres/horde.py | 8 ++-- pyres/job.py | 2 +- pyres/worker.py | 25 +++++------- resweb/views.py | 12 +++--- tests/__init__.py | 3 +- tests/test_horde.py | 2 +- tests/test_jobs.py | 2 +- tests/test_json.py | 4 +- tests/test_schedule.py | 92 +++++++++++++++++++++++++++++------------- tests/test_worker.py | 29 +++++++------ 13 files changed, 128 insertions(+), 87 deletions(-) diff --git a/pyres/__init__.py b/pyres/__init__.py index 6ad64ee..6b9d76d 100644 --- a/pyres/__init__.py +++ b/pyres/__init__.py @@ -3,7 +3,8 @@ from redis import Redis import pyres.json_parser as json -import time, datetime +from datetime import datetime +import calendar import sys import logging @@ -238,9 +239,12 @@ def close(self): self.redis.connection.disconnect() def enqueue_at(self, datetime, klass, *args, **kwargs): + """ + datetime: if it is a naive datetime, it's regarded in UTC; otherwise, it will be converted to a naive datetime in UTC + """ + datetime = self._to_naive_utc_datetime(datetime) class_name = '%s.%s' % (klass.__module__, klass.__name__) - logging.info("enqueued '%s' job for execution at %s" % (class_name, - datetime)) + logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime)) if args: logging.debug("job arguments are: %s" % str(args)) payload = {'class':class_name, 'queue': klass.queue, 'args':args} @@ -249,7 +253,7 @@ def enqueue_at(self, datetime, klass, *args, **kwargs): self.delayed_push(datetime, payload) def delayed_push(self, datetime, item): - key = int(time.mktime(datetime.timetuple())) + key = int(calendar.timegm(datetime.utctimetuple())) self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item)) self.redis.zadd('resque:delayed_queue_schedule', key, key) @@ -268,11 +272,10 @@ def delayed_queue_schedule_size(self): return size def delayed_timestamp_size(self, timestamp): - #key = int(time.mktime(timestamp.timetuple())) return self.redis.llen("resque:delayed:%s" % timestamp) def next_delayed_timestamp(self): - key = int(time.mktime(ResQ._current_time().timetuple())) + key = int(calendar.timegm(ResQ._utcnow().utctimetuple())) array = self.redis.zrangebyscore('resque:delayed_queue_schedule', '-inf', key) timestamp = None @@ -281,7 +284,6 @@ def next_delayed_timestamp(self): return timestamp def next_item_for_timestamp(self, timestamp): - #key = int(time.mktime(timestamp.timetuple())) key = "resque:delayed:%s" % timestamp ret = self.redis.lpop(key) item = None @@ -312,8 +314,14 @@ def _enqueue(cls, klass, *args): _self.push(queue, {'class':class_name,'args':args}) @staticmethod - def _current_time(): - return datetime.datetime.now() + def _utcnow(): + return datetime.utcnow() + + @staticmethod + def _to_naive_utc_datetime(datetime): + # naive datetime is regarded as UTC datetime + timestamp = calendar.timegm(datetime.utctimetuple()) + return datetime.utcfromtimestamp(timestamp) class Stat(object): diff --git a/pyres/extensions.py b/pyres/extensions.py index 6f16008..b950b3f 100644 --- a/pyres/extensions.py +++ b/pyres/extensions.py @@ -1,5 +1,5 @@ import os -import datetime +from datetime import datetime import time import signal @@ -28,11 +28,11 @@ def work(self, interval=5): if self.child: print 'Forked %s at %s' % (self.child, - datetime.datetime.now()) + datetime.utcnow()) os.waitpid(self.child, 0) else: print 'Processing %s since %s' % (job._queue, - datetime.datetime.now()) + datetime.utcnow()) self.process(job) os._exit(0) self.child = None diff --git a/pyres/failure/redis.py b/pyres/failure/redis.py index 715f9de..2d73312 100644 --- a/pyres/failure/redis.py +++ b/pyres/failure/redis.py @@ -1,4 +1,4 @@ -import datetime, time +from datetime import datetime from base64 import b64encode from base import BaseBackend @@ -12,7 +12,7 @@ def save(self, resq=None): if not resq: resq = ResQ() data = { - 'failed_at' : datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), + 'failed_at' : datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S'), 'payload' : self._payload, 'exception' : self._exception.__class__.__name__, 'error' : self._parse_message(self._exception), diff --git a/pyres/horde.py b/pyres/horde.py index bd105e6..b447d1b 100644 --- a/pyres/horde.py +++ b/pyres/horde.py @@ -5,7 +5,7 @@ sys.exit("multiprocessing was not available") import time, os, signal -import datetime +from datetime import datetime import logging import logging.handlers from pyres import ResQ, Stat @@ -67,7 +67,7 @@ def register_signal_handlers(self): def register_minion(self): self.resq.redis.sadd('resque:minions',str(self)) - self.started = datetime.datetime.now() + self.started = datetime.utcnow() def startup(self): self.register_signal_handlers() @@ -107,7 +107,7 @@ def working_on(self, job): self.logger.debug('marking as working on') data = { 'queue': job._queue, - 'run_at': int(time.mktime(datetime.datetime.now().timetuple())), + 'run_at': int(time.time()), 'payload': job._payload } data = json.dumps(data) @@ -238,7 +238,7 @@ def register_khan(self): if not hasattr(self, 'resq'): self.setup_resq() self.resq.redis.sadd('resque:khans',str(self)) - self.started = datetime.datetime.now() + self.started = datetime.utcnow() def _check_commands(self): if not self._shutdown: diff --git a/pyres/job.py b/pyres/job.py index 7bab328..8b724eb 100644 --- a/pyres/job.py +++ b/pyres/job.py @@ -72,7 +72,7 @@ def retry(self, payload_class, args): retry_timeout = getattr(payload_class, 'retry_timeout', 0) if retry_every: - now = ResQ._current_time() + now = ResQ._utcnow() first_attempt = self._payload.get("first_attempt", now) retry_until = first_attempt + timedelta(seconds=retry_timeout) retry_at = now + timedelta(seconds=retry_every) diff --git a/pyres/worker.py b/pyres/worker.py index 62ab454..94f7822 100644 --- a/pyres/worker.py +++ b/pyres/worker.py @@ -1,6 +1,8 @@ import logging import signal -import datetime, time +from datetime import datetime +import calendar +import time import os, sys import json_parser as json import commands @@ -46,22 +48,17 @@ def validate_queues(self): def register_worker(self): self.resq.redis.sadd('resque:workers', str(self)) - #self.resq._redis.add("worker:#{self}:started", Time.now.to_s) - self.started = datetime.datetime.now() + self.started = datetime.utcnow() def _set_started(self, dt): if dt: - key = int(time.mktime(dt.timetuple())) + key = int(calendar.timegm(dt.utctimetuple())) self.resq.redis.set("resque:worker:%s:started" % self, key) else: self.resq.redis.delete("resque:worker:%s:started" % self) def _get_started(self): - datestring = self.resq.redis.get("resque:worker:%s:started" % self) - #ds = None - #if datestring: - # ds = datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S') - return datestring + return self.resq.redis.get("resque:worker:%s:started" % self) started = property(_get_started, _set_started) @@ -143,9 +140,9 @@ def work(self, interval=5): setproctitle("pyres_worker%s: Forked %s at %s" % (__version__, self.child, - datetime.datetime.now())) + datetime.utcnow())) logger.info('Forked %s at %s' % (self.child, - datetime.datetime.now())) + datetime.utcnow())) try: os.waitpid(self.child, 0) @@ -159,9 +156,9 @@ def work(self, interval=5): else: setproctitle("pyres_worker-%s: Processing %s since %s" % (__version__, job._queue, - datetime.datetime.now())) + datetime.utcnow())) logger.info('Processing %s since %s' % - (job._queue, datetime.datetime.now())) + (job._queue, datetime.utcnow())) self.after_fork(job) self.process(job) os._exit(0) @@ -222,7 +219,7 @@ def working_on(self, job): logger.debug('marking as working on') data = { 'queue': job._queue, - 'run_at': str(int(time.mktime(datetime.datetime.now().timetuple()))), + 'run_at': str(int(time.time())), 'payload': job._payload } data = json.dumps(data) diff --git a/resweb/views.py b/resweb/views.py index 3dd8a63..29adbd5 100644 --- a/resweb/views.py +++ b/resweb/views.py @@ -4,7 +4,7 @@ from pyres.worker import Worker as Wrkr from pyres import failure import os -import datetime +from datetime import datetime TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), 'templates') class ResWeb(pystache.View): @@ -105,7 +105,7 @@ def workers(self): if 'queue' in data: item['data'] = True item['code'] = data['payload']['class'] - item['runat'] = str(datetime.datetime.fromtimestamp(float(data['run_at']))) + item['runat'] = str(datetime.utcfromtimestamp(float(data['run_at']))) else: item['data'] = False item['nodata'] = not item['data'] @@ -155,7 +155,7 @@ def workers(self): if 'queue' in data: item['data'] = True item['code'] = data['payload']['class'] - item['runat'] = str(datetime.datetime.fromtimestamp(float(data['run_at']))) + item['runat'] = str(datetime.utcfromtimestamp(float(data['run_at']))) else: item['data'] = False item['nodata'] = not item['data'] @@ -401,7 +401,7 @@ def code(self): def runat(self): data = self._worker.processing() if self.data(): - return str(datetime.datetime.fromtimestamp(float(data['run_at']))) + return str(datetime.utcfromtimestamp(float(data['run_at']))) return '' """ @@ -445,7 +445,7 @@ def size(self): def jobs(self): jobs = [] for timestamp in self.resq.delayed_queue_peek(self.start(), self.end()): - t = datetime.datetime.fromtimestamp(float(timestamp)) + t = datetime.utcfromtimestamp(float(timestamp)) item = dict(timestamp=str(timestamp)) item['size'] = str(self.resq.delayed_timestamp_size(timestamp)) @@ -467,7 +467,7 @@ def __init__(self, host, timestamp, start=0): super(DelayedTimestamp, self).__init__(host) def formated_timestamp(self): - return str(datetime.datetime.fromtimestamp(float(self._timestamp))) + return str(datetime.utcfromtimestamp(float(self._timestamp))) def start(self): return str(self._start) diff --git a/tests/__init__.py b/tests/__init__.py index f8b403a..d097632 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,4 @@ import unittest -import os from pyres import ResQ, str_to_class class Basic(object): @@ -32,7 +31,7 @@ class RetryOnExceptionJob(object): @staticmethod def perform(fail_until): - if ResQ._current_time() < fail_until: + if ResQ._utcnow() < fail_until: raise Exception("Don't blame me! I'm supposed to fail!") else: return True diff --git a/tests/test_horde.py b/tests/test_horde.py index f6179c8..eb2e93b 100644 --- a/tests/test_horde.py +++ b/tests/test_horde.py @@ -1,4 +1,4 @@ -from tests import PyResTests, Basic, TestProcess +from tests import PyResTests from pyres import horde import os diff --git a/tests/test_jobs.py b/tests/test_jobs.py index a936c1a..2982fac 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -25,7 +25,7 @@ def test_fail(self): assert self.redis.llen('resque:failed') == 1 def test_date_arg_type(self): - dt = datetime.now().replace(microsecond=0) + dt = datetime.utcnow().replace(microsecond=0) self.resq.enqueue(ReturnAllArgsJob, dt) job = Job.reserve('basic',self.resq) result = job.perform() diff --git a/tests/test_json.py b/tests/test_json.py index 1a619e6..c73cf7e 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -10,13 +10,13 @@ def test_encode_decode_date(self): assert decoded['dt'] == dt def test_dates_in_lists(self): - dates = [datetime.now() for i in range(50)] + dates = [datetime.utcnow() for i in range(50)] decoded = json.loads(json.dumps(dates)) for value in dates: assert isinstance(value, datetime) def test_dates_in_dict(self): - dates = dict((i, datetime.now()) for i in range(50)) + dates = dict((i, datetime.utcnow()) for i in range(50)) decoded = json.loads(json.dumps(dates)) for i, value in dates.items(): assert isinstance(i, int) diff --git a/tests/test_schedule.py b/tests/test_schedule.py index 27b21fb..5b0fe2e 100644 --- a/tests/test_schedule.py +++ b/tests/test_schedule.py @@ -1,16 +1,31 @@ -from tests import PyResTests, Basic, TestProcess, ErrorObject -from pyres import ResQ -from pyres.job import Job +import calendar +from tests import PyResTests, Basic from pyres.scheduler import Scheduler -import os -import datetime -import time +from datetime import datetime, timedelta, tzinfo + +class GMT1(tzinfo): + def __init__(self): # DST starts last Sunday in March + utc_now = datetime.utcnow() + d = datetime(utc_now.year, 4, 1) # ends last Sunday in October + self.dston = d - timedelta(days=d.weekday() + 1) + d = datetime(utc_now.year, 11, 1) + self.dstoff = d - timedelta(days=d.weekday() + 1) + def utcoffset(self, dt): + return timedelta(hours=1) + self.dst(dt) + def dst(self, dt): + if self.dston <= dt.replace(tzinfo=None) < self.dstoff: + return timedelta(hours=1) + else: + return timedelta(0) + def tzname(self,dt): + return "GMT +1" + class ScheduleTests(PyResTests): def test_enqueue_at(self): - d = datetime.datetime.now() + datetime.timedelta(days=1) - d2 = d + datetime.timedelta(days=1) - key = int(time.mktime(d.timetuple())) - key2 = int(time.mktime(d2.timetuple())) + d = datetime.utcnow() + timedelta(days=1) + d2 = d + timedelta(days=1) + key = int(calendar.timegm(d.utctimetuple())) + key2 = int(calendar.timegm(d2.utctimetuple())) self.resq.enqueue_at(d, Basic,"test1") self.resq.enqueue_at(d, Basic,"test2") assert self.redis.llen("resque:delayed:%s" % key) == 2 @@ -18,43 +33,66 @@ def test_enqueue_at(self): self.resq.enqueue_at(d2, Basic,"test1") assert self.redis.llen("resque:delayed:%s" % key2) == 1 assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 2 - + + def test_enqueue_at_with_GMT1_aware_datetime_not_in_DST(self): + utc_now = datetime.utcnow() + gmt1_dt_not_in_dst = utc_now.replace(month=2, tzinfo=GMT1()) + d = gmt1_dt_not_in_dst + timedelta(days=1) + d2 = d + timedelta(days=1) + key = int(calendar.timegm(d.utctimetuple())) + key2 = int(calendar.timegm(d2.utctimetuple())) + self.resq.enqueue_at(d, Basic,"test1") + self.resq.enqueue_at(d, Basic,"test2") + assert self.redis.llen("resque:delayed:%s" % key) == 2 + assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 1 + self.resq.enqueue_at(d2, Basic,"test1") + assert self.redis.llen("resque:delayed:%s" % key2) == 1 + assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 2 + + def test_enqueue_at_with_GMT1_aware_datetime_in_DST(self): + utc_now = datetime.utcnow() + gmt1_dt_in_dst = utc_now.replace(month=5, tzinfo=GMT1()) + d = gmt1_dt_in_dst + timedelta(days=1) + d2 = d + timedelta(days=1) + key = int(calendar.timegm(d.utctimetuple())) + key2 = int(calendar.timegm(d2.utctimetuple())) + self.resq.enqueue_at(d, Basic,"test1") + self.resq.enqueue_at(d, Basic,"test2") + assert self.redis.llen("resque:delayed:%s" % key) == 2 + assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 1 + self.resq.enqueue_at(d2, Basic,"test1") + assert self.redis.llen("resque:delayed:%s" % key2) == 1 + assert len(self.redis.zrange('resque:delayed_queue_schedule',0,20)) == 2 + def test_delayed_queue_schedule_size(self): - d = datetime.datetime.now() + datetime.timedelta(days=1) - d2 = d + datetime.timedelta(days=1) + d = datetime.utcnow() + timedelta(days=1) + d2 = d + timedelta(days=1) d3 = d - key = int(time.mktime(d.timetuple())) - key2 = int(time.mktime(d2.timetuple())) self.resq.enqueue_at(d, Basic,"test1") self.resq.enqueue_at(d2, Basic,"test1") self.resq.enqueue_at(d3, Basic,"test1") assert self.resq.delayed_queue_schedule_size() == 3 def test_delayed_timestamp_size(self): - d = datetime.datetime.now() + datetime.timedelta(days=1) - d2 = d + datetime.timedelta(days=1) - key = int(time.mktime(d.timetuple())) - key2 = int(time.mktime(d2.timetuple())) + d = datetime.utcnow() + timedelta(days=1) + key = int(calendar.timegm(d.utctimetuple())) self.resq.enqueue_at(d, Basic,"test1") assert self.resq.delayed_timestamp_size(key) == 1 self.resq.enqueue_at(d, Basic,"test1") assert self.resq.delayed_timestamp_size(key) == 2 def test_next_delayed_timestamp(self): - d = datetime.datetime.now() + datetime.timedelta(days=-1) - d2 = d + datetime.timedelta(days=-2) - key = int(time.mktime(d.timetuple())) - key2 = int(time.mktime(d2.timetuple())) + d = datetime.utcnow() + timedelta(days=-1) + d2 = d + timedelta(days=-2) + key2 = int(calendar.timegm(d2.utctimetuple())) self.resq.enqueue_at(d, Basic,"test1") self.resq.enqueue_at(d2, Basic,"test1") item = self.resq.next_delayed_timestamp() assert int(item) == key2 def test_next_item_for_timestamp(self): - d = datetime.datetime.now() + datetime.timedelta(days=-1) - d2 = d + datetime.timedelta(days=-2) - #key = int(time.mktime(d.timetuple())) - #key2 = int(time.mktime(d2.timetuple())) + d = datetime.utcnow() + timedelta(days=-1) + d2 = d + timedelta(days=-2) self.resq.enqueue_at(d, Basic,"test1") self.resq.enqueue_at(d2, Basic,"test1") timestamp = self.resq.next_delayed_timestamp() diff --git a/tests/test_worker.py b/tests/test_worker.py index d1903a3..a574c21 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,8 +4,8 @@ from pyres.scheduler import Scheduler from pyres.worker import Worker import os -import time import datetime +import calendar class WorkerTests(PyResTests): @@ -140,13 +140,12 @@ def test_working(self): assert worker != workers[0] def test_started(self): - import datetime worker = Worker(['basic']) - dt = datetime.datetime.now() + dt = datetime.datetime.utcnow() worker.started = dt name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic') - assert self.redis.get('resque:worker:%s:started' % name) == str(int(time.mktime(dt.timetuple()))) - assert worker.started == str(int(time.mktime(dt.timetuple()))) + assert self.redis.get('resque:worker:%s:started' % name) == str(int(calendar.timegm(dt.utctimetuple()))) + assert worker.started == str(int(calendar.timegm(dt.utctimetuple()))) worker.started = None assert not self.redis.exists('resque:worker:%s:started' % name) @@ -179,8 +178,8 @@ def test_prune_dead_workers(self): assert self.redis.scard('resque:workers') == 3 def test_retry_on_exception(self): - now = datetime.datetime.now() - self.set_current_time(now) + now = datetime.datetime.utcnow() + self.set_utcnow(now) worker = Worker(['basic']) scheduler = Scheduler() @@ -191,20 +190,20 @@ def test_retry_on_exception(self): assert worker.get_failed() == 0 # check it retries the first time - self.set_current_time(now + datetime.timedelta(seconds=5)) + self.set_utcnow(now + datetime.timedelta(seconds=5)) scheduler.handle_delayed_items() assert None == worker.process() assert worker.get_failed() == 0 # check it runs fine when it's stopped crashing - self.set_current_time(now + datetime.timedelta(seconds=60)) + self.set_utcnow(now + datetime.timedelta(seconds=60)) scheduler.handle_delayed_items() assert True == worker.process() assert worker.get_failed() == 0 def test_retries_give_up_eventually(self): - now = datetime.datetime.now() - self.set_current_time(now) + now = datetime.datetime.utcnow() + self.set_utcnow(now) worker = Worker(['basic']) scheduler = Scheduler() @@ -215,16 +214,16 @@ def test_retries_give_up_eventually(self): assert worker.get_failed() == 0 # check it retries the first time - self.set_current_time(now + datetime.timedelta(seconds=5)) + self.set_utcnow(now + datetime.timedelta(seconds=5)) scheduler.handle_delayed_items() assert None == worker.process() assert worker.get_failed() == 0 # check it fails when we've been trying too long - self.set_current_time(now + datetime.timedelta(seconds=20)) + self.set_utcnow(now + datetime.timedelta(seconds=20)) scheduler.handle_delayed_items() assert None == worker.process() assert worker.get_failed() == 1 - def set_current_time(self, time): - ResQ._current_time = staticmethod(lambda: time) + def set_utcnow(self, now): + ResQ._utcnow = staticmethod(lambda: now)