You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've sorta come up with two methods of submitting tasks, and gathering their results via polling. I am wondering if you'd recommend either of these methods, or have another suggestion.
The first uses Dramatiq. Run python3.12 test.py and in another terminal, run dramatiq --processes 2 --threads 2 test. This one works, although I'm not sure if the authors would recommend it. I'm currently asking them about this.
importasyncioimporttimeimportdramatiqfromdramatiq.brokers.redisimportRedisBrokerfromdramatiq.middlewareimportAsyncIOfromdramatiq.resultsimportResultsfromdramatiq.results.backendimportDEFAULT_TIMEOUTfromdramatiq.results.backends.redisimportRedisBackendfromdramatiq.results.errorsimportResultMissing, ResultTimeoutfromquartimportQuart, jsonifyclassRedisBackendExt(RedisBackend):
defget_result_from_message_key(self, message_key, *, block=False, timeout=None):
iftimeoutisNone:
timeout=DEFAULT_TIMEOUTifblock:
timeout=int(timeout/1000)
iftimeout==0:
data=self.client.rpoplpush(message_key, message_key)
else:
data=self.client.brpoplpush(message_key, message_key, timeout)
ifdataisNone:
raiseResultTimeout(message_key)
else:
data=self.client.lindex(message_key, 0)
ifdataisNone:
raiseResultMissing(message_key)
returnself.unwrap_result(self.encoder.decode(data))
backend=RedisBackendExt()
broker=RedisBroker(host='localhost', port=6379)
broker.add_middleware(Results(backend=backend))
broker.add_middleware(AsyncIO())
dramatiq.set_broker(broker)
app=Quart(__name__) # totally decoupled from dramatiq ! I was unable to achieve this with Flask + Celery so far.@dramatiq.actor(store_results=True)asyncdefcut_granite_with_toothpick(enqueue_time: float):
print(f'{enqueue_time=}')
awaitasyncio.sleep(4)
return'finally, the granite is cut...'@app.get('/message/<string:message_key>')asyncdefget_result(message_key):
result=backend.get_result_from_message_key(message_key)
returnjsonify({'result': result}), 200@app.get('/')asyncdefindex():
enqueue_time=time.time()
message=cut_granite_with_toothpick.send(enqueue_time)
message_key=backend.build_message_key(message)
print(f'http://127.0.0.1:8000/message/{message_key}')
returnjsonify({'message_key': message_key}), 202if__name__=='__main__':
app.run(host='127.0.0.1', port=8000, debug=True)
The second is an idea is to modify Quart.add_background_task, and return an auto-generated task_id, and poll using that. Each background task function would need to insert a task_id - result into Redis manually. Polling endpoints would need to similarly query Redis for results.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I've sorta come up with two methods of submitting tasks, and gathering their results via polling. I am wondering if you'd recommend either of these methods, or have another suggestion.
The first uses Dramatiq. Run
python3.12 test.py
and in another terminal, rundramatiq --processes 2 --threads 2 test
. This one works, although I'm not sure if the authors would recommend it. I'm currently asking them about this.The second is an idea is to modify
Quart.add_background_task
, and return an auto-generated task_id, and poll using that. Each background task function would need to insert a task_id - result into Redis manually. Polling endpoints would need to similarly query Redis for results.Beta Was this translation helpful? Give feedback.
All reactions