Skip to content

Commit

Permalink
refacto: a lot simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
manatlan committed Oct 8, 2023
1 parent 0f1e94a commit 627ca20
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 183 deletions.
24 changes: 4 additions & 20 deletions htagweb/appserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from . import crypto
import redys.v2

from htagweb.server import Hid, hrserver_orchestrator, kill_hrserver, wait_hrserver
from htagweb.server import Hid, startServer, stopServer
from htagweb.server.client import HrClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -148,7 +148,7 @@ async def loop_tag_update(self, event, websocket):
await asyncio.sleep(0.1)
except:
print("**loop_tag_update, broken bus, will stop the loop_tag_update !**")

async def on_connect(self, websocket):
#====================================================== get the event
fqn=websocket.path_params.get("fqn","")
Expand Down Expand Up @@ -187,27 +187,11 @@ async def on_disconnect(self, websocket, close_code):
await bus.unsubscribe(event)


def processHrServer():
asyncio.run( redys.v2.loop(hrserver_orchestrator()) )


async def lifespan(app):
# start a process loop (with redys + hrserver)
process_hrserver=multiprocessing.Process(target=processHrServer)
process_hrserver.start()

# wait hrserver ready
await wait_hrserver()

s=await startServer()
yield

# stop hrserver
loop = asyncio.get_event_loop()
await kill_hrserver()

# wait process to finnish gracefully
process_hrserver.join()

await stopServer(s)

class AppServer(Starlette):
def __init__(self,
Expand Down
150 changes: 43 additions & 107 deletions htagweb/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import logging
logger = logging.getLogger(__name__)

# input command in hrprocess
CMD_PS_EXIT="EXIT"
CMD_PS_REUSE="REUSE"

EVENT_SERVER="EVENT_SERVER"

CMD_EXIT="EXIT"
CMD_REUSE="RENDER"
# output command from hrprocess to hrclient
CMD_RESP_RENDER="RENDER"
CMD_RESP_RECREATE="RECREATE"

KEYAPPS="htagweb.apps"

Expand Down Expand Up @@ -81,7 +83,7 @@ def __repr__(self):
return self.hid

##################################################################################
def process(hid:Hid,js,init,sesprovidername):
def hrprocess(hid:Hid,js,init,sesprovidername):
##################################################################################
FactorySession=importFactorySession(sesprovidername)

Expand All @@ -93,11 +95,11 @@ def log(*a):
logger.info(txt)

async def loop():
RRR={"1":"1"} #TODO: find something better ;-)
running={"1":"1"} #TODO: find something better ;-)

def suicide():
log("suicide")
RRR.clear()
running.clear()

bus = redys.v2.AClient()
try:
Expand Down Expand Up @@ -132,7 +134,7 @@ def suicide():
await bus.subscribe( hid.event_interact )

# publish the 1st rendering
assert await bus.publish(hid.event_response,str(hr))
assert await bus.publish(hid.event_response,dict( cmd=CMD_RESP_RENDER,render=str(hr)))

# register tag.update feature
#======================================
Expand All @@ -148,18 +150,18 @@ async def update(actions):
#======================================
recreate={}

while RRR:
while running:
params = await bus.get_event( hid.event_interact )
if params is not None: # sometimes it's not a dict ?!? (bool ?!)
if params.get("cmd") == CMD_EXIT:
if params.get("cmd") == CMD_PS_EXIT:
log("Exit explicit")
recreate={}
break # <- destroy itself
elif params.get("cmd") == CMD_REUSE:
elif params.get("cmd") == CMD_PS_REUSE:
# event REUSE
params=params.get("params")
if str(params['init'])!=key_init or os.path.getmtime(inspect.getfile(klass))!=last_mod_time:
# ask server/orchestrator to recreate me
# ask hrclient to destroy/recreate me
log("RECREATE")
recreate=dict(
hid=hid, # reuse current
Expand All @@ -172,7 +174,7 @@ async def update(actions):
log("REUSE")
recreate={}
hr.session = FactorySession(hid.uid) # reload session
can = await bus.publish(hid.event_response,str(hr))
can = await bus.publish(hid.event_response,dict( cmd=CMD_RESP_RENDER, render=str(hr)))
if not can:
log("Can't answer the response for the REUSE !!!!")
else:
Expand Down Expand Up @@ -203,87 +205,24 @@ async def update(actions):
await bus.unsubscribe( hid.event_interact )

if recreate:
assert await bus.publish( EVENT_SERVER , recreate )
assert await bus.publish( hid.event_response , dict(cmd=CMD_RESP_RECREATE,params=recreate) )


asyncio.run( loop() )
log("end")



##################################################################################
async def hrserver_orchestrator():
##################################################################################
bus=redys.v2.AClient()

def log(*a):
txt=f"-ORCHESTRATOR- %s" % (" ".join([str(i) for i in a]))
print(txt,flush=True,file=sys.stdout)
logger.info(txt)


# prevent multiple orchestrators
if await bus.get("hrserver_orchestrator_running")==True:
log("already running")
return
else:
log("started")
await bus.set("hrserver_orchestrator_running",True)

# register its main event
await bus.subscribe( EVENT_SERVER )
async def killall():
bus=redys.v2.AClient()

async def killall():
# try to send a EXIT CMD to all running ps
# try to send a EXIT CMD to all running ps
running_hids = await bus.get(KEYAPPS) or []
while running_hids:
for hid in running_hids:
await bus.publish(Hid(hid).event_interact,dict(cmd=CMD_PS_EXIT))
running_hids = await bus.get(KEYAPPS) or []
while running_hids:
for hid in running_hids:
await bus.publish(Hid(hid).event_interact,dict(cmd=CMD_EXIT))
running_hids = await bus.get(KEYAPPS) or []
await asyncio.sleep(0.1)

while 1:
params = await bus.get_event( EVENT_SERVER )
if params is not None:
if params.get("cmd") == CMD_EXIT:
log(EVENT_SERVER, params.get("cmd") )
break
elif params.get("cmd") == "CLEAN":
log(EVENT_SERVER, params.get("cmd") )
await killall()
continue

hid:Hid=params["hid"]

running_hids:list=await bus.get(KEYAPPS) or []
if str(hid) in running_hids:
log("Try to reuse process",hid)
can = await bus.publish(hid.event_interact,dict(cmd=CMD_REUSE,params=params))
if not can:
log("Can't answer the interaction REUSE !!!!")
else:
p=multiprocessing.Process(target=process, args=[],kwargs=params)
p.start()
log("Start a new process",hid,"in",p.pid)

await asyncio.sleep(0.1)

assert await bus.unsubscribe( EVENT_SERVER )

await bus.set("hrserver_orchestrator_running",False)

await killall()

log("stopped")

async def wait_redys():
bus=redys.v2.AClient()
while 1:
try:
if await bus.ping()=="pong":
break
except:
pass
await asyncio.sleep(0.1)


Expand All @@ -301,11 +240,11 @@ async def list(self) -> list:

async def kill(self,hid:Hid):
""" kill a process (process event)"""
await self._bus.publish(hid.event_interact,dict(cmd=CMD_EXIT))
await self._bus.publish(hid.event_interact,dict(cmd=CMD_PS_EXIT))

async def killall(self):
""" killall process (server event)"""
await self._bus.publish(EVENT_SERVER,dict(cmd="CLEAN"))
""" killall processes"""
await killall()

async def session(self,hid:Hid) -> dict:
""" get session for hid"""
Expand All @@ -314,38 +253,35 @@ async def session(self,hid:Hid) -> dict:
return FactorySession(hid.uid)


async def wait_hrserver():
##################################################################################
async def startServer():
##################################################################################
# start a redys server (only one will win)
s=redys.v2.ServerProcess()

# wait redys up
bus=redys.v2.AClient()
while 1:
try:
if await bus.get("hrserver_orchestrator_running"):
if await bus.ping()=="pong":
break
except Exception as e:
print(e)
except:
pass
await asyncio.sleep(0.1)


async def kill_hrserver():
bus=redys.v2.AClient()
await bus.publish( EVENT_SERVER, dict(cmd=CMD_EXIT) ) # kill orchestrator loop

await asyncio.sleep(1)
return s


##################################################################################
async def hrserver():
async def stopServer(s):
##################################################################################
s=redys.v2.Server()
s.start()

await wait_redys()

await hrserver_orchestrator()
# clean all running process
await killall()

# before stopping
s.stop()




if __name__=="__main__":
asyncio.run( hrserver() )
# asyncio.run( hrserver() )
pass
Loading

0 comments on commit 627ca20

Please sign in to comment.