From 627ca2008238db2e5407002878fe8c17a547fcc2 Mon Sep 17 00:00:00 2001 From: manatlan Date: Sun, 8 Oct 2023 11:14:10 +0200 Subject: [PATCH] refacto: a lot simpler --- htagweb/appserver.py | 24 +----- htagweb/server/__init__.py | 150 +++++++++++-------------------------- htagweb/server/client.py | 86 ++++++++++++--------- test_server.py | 35 ++++----- 4 files changed, 112 insertions(+), 183 deletions(-) diff --git a/htagweb/appserver.py b/htagweb/appserver.py index 34df175..8e98e6c 100644 --- a/htagweb/appserver.py +++ b/htagweb/appserver.py @@ -34,7 +34,7 @@ from . import crypto import redys.v2 -from htagweb.server import Hid, hrserver_orchestrator, kill_hrserver, wait_hrserver +from htagweb.server import Hid, startServer, stopServer from htagweb.server.client import HrClient logger = logging.getLogger(__name__) @@ -148,7 +148,7 @@ async def loop_tag_update(self, event, websocket): await asyncio.sleep(0.1) except: print("**loop_tag_update, broken bus, will stop the loop_tag_update !**") - + async def on_connect(self, websocket): #====================================================== get the event fqn=websocket.path_params.get("fqn","") @@ -187,27 +187,11 @@ async def on_disconnect(self, websocket, close_code): await bus.unsubscribe(event) -def processHrServer(): - asyncio.run( redys.v2.loop(hrserver_orchestrator()) ) - async def lifespan(app): - # start a process loop (with redys + hrserver) - process_hrserver=multiprocessing.Process(target=processHrServer) - process_hrserver.start() - - # wait hrserver ready - await wait_hrserver() - + s=await startServer() yield - - # stop hrserver - loop = asyncio.get_event_loop() - await kill_hrserver() - - # wait process to finnish gracefully - process_hrserver.join() - + await stopServer(s) class AppServer(Starlette): def __init__(self, diff --git a/htagweb/server/__init__.py b/htagweb/server/__init__.py index fcea99f..5093618 100644 --- a/htagweb/server/__init__.py +++ b/htagweb/server/__init__.py @@ -18,11 +18,13 @@ import logging logger = logging.getLogger(__name__) +# input command in hrprocess +CMD_PS_EXIT="EXIT" +CMD_PS_REUSE="REUSE" -EVENT_SERVER="EVENT_SERVER" - -CMD_EXIT="EXIT" -CMD_REUSE="RENDER" +# output command from hrprocess to hrclient +CMD_RESP_RENDER="RENDER" +CMD_RESP_RECREATE="RECREATE" KEYAPPS="htagweb.apps" @@ -81,7 +83,7 @@ def __repr__(self): return self.hid ################################################################################## -def process(hid:Hid,js,init,sesprovidername): +def hrprocess(hid:Hid,js,init,sesprovidername): ################################################################################## FactorySession=importFactorySession(sesprovidername) @@ -93,11 +95,11 @@ def log(*a): logger.info(txt) async def loop(): - RRR={"1":"1"} #TODO: find something better ;-) + running={"1":"1"} #TODO: find something better ;-) def suicide(): log("suicide") - RRR.clear() + running.clear() bus = redys.v2.AClient() try: @@ -132,7 +134,7 @@ def suicide(): await bus.subscribe( hid.event_interact ) # publish the 1st rendering - assert await bus.publish(hid.event_response,str(hr)) + assert await bus.publish(hid.event_response,dict( cmd=CMD_RESP_RENDER,render=str(hr))) # register tag.update feature #====================================== @@ -148,18 +150,18 @@ async def update(actions): #====================================== recreate={} - while RRR: + while running: params = await bus.get_event( hid.event_interact ) if params is not None: # sometimes it's not a dict ?!? (bool ?!) - if params.get("cmd") == CMD_EXIT: + if params.get("cmd") == CMD_PS_EXIT: log("Exit explicit") recreate={} break # <- destroy itself - elif params.get("cmd") == CMD_REUSE: + elif params.get("cmd") == CMD_PS_REUSE: # event REUSE params=params.get("params") if str(params['init'])!=key_init or os.path.getmtime(inspect.getfile(klass))!=last_mod_time: - # ask server/orchestrator to recreate me + # ask hrclient to destroy/recreate me log("RECREATE") recreate=dict( hid=hid, # reuse current @@ -172,7 +174,7 @@ async def update(actions): log("REUSE") recreate={} hr.session = FactorySession(hid.uid) # reload session - can = await bus.publish(hid.event_response,str(hr)) + can = await bus.publish(hid.event_response,dict( cmd=CMD_RESP_RENDER, render=str(hr))) if not can: log("Can't answer the response for the REUSE !!!!") else: @@ -203,7 +205,7 @@ async def update(actions): await bus.unsubscribe( hid.event_interact ) if recreate: - assert await bus.publish( EVENT_SERVER , recreate ) + assert await bus.publish( hid.event_response , dict(cmd=CMD_RESP_RECREATE,params=recreate) ) asyncio.run( loop() ) @@ -211,79 +213,16 @@ async def update(actions): -################################################################################## -async def hrserver_orchestrator(): -################################################################################## - bus=redys.v2.AClient() - - def log(*a): - txt=f"-ORCHESTRATOR- %s" % (" ".join([str(i) for i in a])) - print(txt,flush=True,file=sys.stdout) - logger.info(txt) - - - # prevent multiple orchestrators - if await bus.get("hrserver_orchestrator_running")==True: - log("already running") - return - else: - log("started") - await bus.set("hrserver_orchestrator_running",True) - # register its main event - await bus.subscribe( EVENT_SERVER ) +async def killall(): + bus=redys.v2.AClient() - async def killall(): - # try to send a EXIT CMD to all running ps + # try to send a EXIT CMD to all running ps + running_hids = await bus.get(KEYAPPS) or [] + while running_hids: + for hid in running_hids: + await bus.publish(Hid(hid).event_interact,dict(cmd=CMD_PS_EXIT)) running_hids = await bus.get(KEYAPPS) or [] - while running_hids: - for hid in running_hids: - await bus.publish(Hid(hid).event_interact,dict(cmd=CMD_EXIT)) - running_hids = await bus.get(KEYAPPS) or [] - await asyncio.sleep(0.1) - - while 1: - params = await bus.get_event( EVENT_SERVER ) - if params is not None: - if params.get("cmd") == CMD_EXIT: - log(EVENT_SERVER, params.get("cmd") ) - break - elif params.get("cmd") == "CLEAN": - log(EVENT_SERVER, params.get("cmd") ) - await killall() - continue - - hid:Hid=params["hid"] - - running_hids:list=await bus.get(KEYAPPS) or [] - if str(hid) in running_hids: - log("Try to reuse process",hid) - can = await bus.publish(hid.event_interact,dict(cmd=CMD_REUSE,params=params)) - if not can: - log("Can't answer the interaction REUSE !!!!") - else: - p=multiprocessing.Process(target=process, args=[],kwargs=params) - p.start() - log("Start a new process",hid,"in",p.pid) - - await asyncio.sleep(0.1) - - assert await bus.unsubscribe( EVENT_SERVER ) - - await bus.set("hrserver_orchestrator_running",False) - - await killall() - - log("stopped") - -async def wait_redys(): - bus=redys.v2.AClient() - while 1: - try: - if await bus.ping()=="pong": - break - except: - pass await asyncio.sleep(0.1) @@ -301,11 +240,11 @@ async def list(self) -> list: async def kill(self,hid:Hid): """ kill a process (process event)""" - await self._bus.publish(hid.event_interact,dict(cmd=CMD_EXIT)) + await self._bus.publish(hid.event_interact,dict(cmd=CMD_PS_EXIT)) async def killall(self): - """ killall process (server event)""" - await self._bus.publish(EVENT_SERVER,dict(cmd="CLEAN")) + """ killall processes""" + await killall() async def session(self,hid:Hid) -> dict: """ get session for hid""" @@ -314,38 +253,35 @@ async def session(self,hid:Hid) -> dict: return FactorySession(hid.uid) -async def wait_hrserver(): +################################################################################## +async def startServer(): +################################################################################## + # start a redys server (only one will win) + s=redys.v2.ServerProcess() + + # wait redys up bus=redys.v2.AClient() while 1: try: - if await bus.get("hrserver_orchestrator_running"): + if await bus.ping()=="pong": break - except Exception as e: - print(e) + except: + pass await asyncio.sleep(0.1) - -async def kill_hrserver(): - bus=redys.v2.AClient() - await bus.publish( EVENT_SERVER, dict(cmd=CMD_EXIT) ) # kill orchestrator loop - - await asyncio.sleep(1) + return s ################################################################################## -async def hrserver(): +async def stopServer(s): ################################################################################## - s=redys.v2.Server() - s.start() - - await wait_redys() - - await hrserver_orchestrator() + # clean all running process + await killall() + # before stopping s.stop() - - if __name__=="__main__": - asyncio.run( hrserver() ) + # asyncio.run( hrserver() ) + pass diff --git a/htagweb/server/client.py b/htagweb/server/client.py index 11114a2..02c4579 100644 --- a/htagweb/server/client.py +++ b/htagweb/server/client.py @@ -7,16 +7,23 @@ # https://github.com/manatlan/htagweb # ############################################################################# +import multiprocessing import uuid,asyncio,time,sys import redys import redys.v2 -from htagweb.server import EVENT_SERVER,Hid +from htagweb.server import CMD_RESP_RECREATE, CMD_RESP_RENDER, CMD_PS_REUSE, KEYAPPS,Hid, hrprocess import logging logger = logging.getLogger(__name__) TIMEOUT=2*60 # A interaction can take 2min max +def startProcess(params:dict): + p=multiprocessing.Process(target=hrprocess, args=[],kwargs=params) + p.start() + return p + + class HrClient: def __init__(self,uid:str,fqn:str,js:str=None,sesprovidername=None): """ !!!!!!!!!!!!!!!!!!!! if js|sesprovidername is None : can't do a start() !!!!!!!!!!!!!!!!!!!!!!""" @@ -31,17 +38,11 @@ def error(self, *a): print(txt,flush=True,file=sys.stderr) logger.error(txt) + def log(self, *a): + txt=f".HrClient {self.hid.uid} {self.hid.fqn}: %s" % (" ".join([str(i) for i in a])) + print(txt,flush=True,file=sys.stderr) + logger.info(txt) - async def _wait(self,event, s=TIMEOUT): - # wait for a response - t1=time.monotonic() - while time.monotonic() - t1 < s: - message = await self.bus.get_event( event ) - if message is not None: - return message - - self.error(f"Event TIMEOUT ({s}s) on {event} !!!") - return None async def start(self,*a,**k) -> str: """ Start the defined app with this params (a,k) @@ -52,16 +53,41 @@ async def start(self,*a,**k) -> str: # subscribe for response await self.bus.subscribe( self.hid.event_response ) - # start the process app - assert await self.bus.publish( EVENT_SERVER , dict( + #/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ + params=dict( hid=self.hid, js=self.js, init= (a,k), sesprovidername=self.sesprovidername, - )) + ) + + running_hids:list=await self.bus.get(KEYAPPS) or [] + if str(self.hid) in running_hids: + self.log("Try to reuse process",self.hid) + can = await self.bus.publish(self.hid.event_interact,dict(cmd=CMD_PS_REUSE,params=params)) + if not can: + self.log("Can't answer the interaction REUSE !!!!") + else: + p=startProcess(params) + self.log("Start a new process",self.hid,"in",p.pid) + #/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\ + + # wait for a response + t1=time.monotonic() + while time.monotonic() - t1 < TIMEOUT: + message = await self.bus.get_event( self.hid.event_response ) + if message is not None: + if message.get("cmd")==CMD_RESP_RECREATE: + # the current process ask hrclient to recreate a new process + p=startProcess(params) + self.log("Start a new process",self.hid,"in",p.pid) + elif message.get("cmd")==CMD_RESP_RENDER: + # the process has giver a right answer ... return the rendering + return message.get("render") + + self.error(f"Event TIMEOUT ({TIMEOUT}s) on {self.hid.event_response} !!!") + return "?!" - # wait 1st rendering - return await self._wait(self.hid.event_response) or "?!" async def interact(self,**params) -> dict: @@ -82,23 +108,13 @@ async def interact(self,**params) -> dict: self.error("***HrClient.interact error***",e) return {} + async def _wait(self,event, s=TIMEOUT): + # wait for a response + t1=time.monotonic() + while time.monotonic() - t1 < s: + message = await self.bus.get_event( event ) + if message is not None: + return message -async def main(): - uid ="u1" - p=HrClient(uid,"obj:App","//") - #~ html=await p.start() - #~ print(html) - - #~ actions=await p.interact( oid="ut", method_name="doit", args=[], kargs={}, event={} ) - #~ print(actions) - - await p.kill() - await p.kill() - await p.kill() - #~ await p.kill() - #~ await p.kill() - #~ await HrPilot.list() - #~ await HrPilot.clean() - -if __name__=="__main__": - asyncio.run( main() ) + self.error(f"Event TIMEOUT ({s}s) on {event} !!!") + return None diff --git a/test_server.py b/test_server.py index 99b8335..ad58c56 100644 --- a/test_server.py +++ b/test_server.py @@ -3,8 +3,9 @@ import pytest,sys,io import multiprocessing,threading import time -from htagweb.appserver import processHrServer,lifespan -from htagweb.server import ServerClient, kill_hrserver, wait_hrserver + +import redys.v2 +from htagweb.server import ServerClient, killall, startServer, stopServer from htagweb.server.client import HrClient import threading @@ -12,18 +13,11 @@ @pytest.fixture() def server(): # nearly "same code" as lifespan # start a process loop (with redys + hrserver) - process_hrserver=multiprocessing.Process(target=processHrServer) - process_hrserver.start() - - # wait hrserver ready - asyncio.run( wait_hrserver() ) + s=asyncio.run( startServer()) yield "x" - # stop hrserver - asyncio.run( kill_hrserver() ) - # wait process to finnish gracefully - process_hrserver.join() + asyncio.run( stopServer(s)) @pytest.mark.asyncio @@ -48,16 +42,15 @@ async def test_base( server ): assert ll[0].uid == uid assert ll[0].fqn == "test_hr:App" - + await killall() if __name__=="__main__": - pass - # p=multiprocessing.Process(target=processHrServer) - # try: - # p.start() - # time.sleep(1) - - # asyncio.run( test_base(42) ) - # finally: - # asyncio.run( kill_hrserver() ) \ No newline at end of file + s=asyncio.run( startServer()) + + asyncio.run( test_base("x") ) + + asyncio.run( stopServer(s)) + + +