Skip to content

Commit

Permalink
feat: ServerClient
Browse files Browse the repository at this point in the history
  • Loading branch information
manatlan committed Oct 7, 2023
1 parent 258efa8 commit 6f59b93
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 76 deletions.
6 changes: 3 additions & 3 deletions htagweb/appserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from . import crypto
import redys.v2

from htagweb.server import hrserver, hrserver_orchestrator, kill_hrserver, wait_hrserver
from htagweb.server import Hid, hrserver_orchestrator, kill_hrserver, wait_hrserver
from htagweb.server.client import HrClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -150,7 +150,7 @@ async def on_connect(self, websocket):
#====================================================== get the event
fqn=websocket.path_params.get("fqn","")
uid=websocket.scope["uid"]
event=HrClient(uid,fqn).event_response+"_update"
event=Hid.create(uid,fqn).event_response_update
#======================================================
self.is_parano="parano" in websocket.query_params.keys()

Expand All @@ -177,7 +177,7 @@ async def on_disconnect(self, websocket, close_code):
#====================================================== get the event
fqn=websocket.path_params.get("fqn","")
uid=websocket.scope["uid"]
event=HrClient(uid,fqn).event_response+"_update"
event=Hid.create(uid,fqn).event_response_update
#======================================================

with redys.v2.AClient() as bus:
Expand Down
141 changes: 109 additions & 32 deletions htagweb/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
CMD_EXIT="EXIT"
CMD_RENDER="RENDER"

KEYAPPS="htagweb.apps"

def importClassFromFqn(fqn_norm:str) -> type:
assert ":" in fqn_norm
#--------------------------- fqn -> module, name
Expand All @@ -50,17 +52,38 @@ def importClassFromFqn(fqn_norm:str) -> type:
klass.imports=[]
return klass

def importFactorySession( sesprovidername=None ):
import htagweb.sessions
return getattr(htagweb.sessions,sesprovidername or "MemDict")

class Hid:
def __init__(self,hid:str):
uid,fqn=hid.split("_",1)
self.uid=uid
self.fqn=fqn
self.hid=hid

self.event_interact = "interact_"+hid
self.event_interact_response = self.event_interact+"_response"

self.event_response = "response_"+hid
self.event_response_update = self.event_response+"_update"

self.key_sesprovider = "sesprovider_"+hid

@staticmethod
def create(uid:str,fqn:str):
return Hid(uid+"_"+fqn)

def __str__(self):
return self.hid
def __repr__(self):
return self.hid

##################################################################################
def process(uid,hid,event_response,event_interact,fqn,js,init,sesprovidername,force):
def process(hid:Hid,js,init,sesprovidername,force):
##################################################################################
#''''''''''''''''''''''''''''''''''''''''''''''''''''
if sesprovidername is None:
sesprovidername="MemDict"
import htagweb.sessions
FactorySession=getattr(htagweb.sessions,sesprovidername)
#''''''''''''''''''''''''''''''''''''''''''''''''''''
FactorySession=importFactorySession(sesprovidername)

pid = os.getpid()

Expand All @@ -79,14 +102,21 @@ def suicide():
bus = redys.v2.AClient()
try:
if os.getcwd() not in sys.path: sys.path.insert(0,os.getcwd())
klass=importClassFromFqn(fqn)
klass=importClassFromFqn(hid.fqn)
except Exception as e:
log("importClassFromFqn ERROR",traceback.format_exc())
#TODO: do better here
assert await bus.publish(event_response,str(e))
assert await bus.publish(hid.event_response,str(e))
return

session = FactorySession(uid)
# register hid in redys "apps"
await bus.sadd(KEYAPPS,str(hid))

# save sesprovider for this hid
await bus.set(hid.key_sesprovider, FactorySession.__name__)


session = FactorySession(hid.uid)

styles=Tag.style("body.htagoff * {cursor:not-allowed !important;}")

Expand All @@ -96,16 +126,16 @@ def suicide():


# subscribe for interaction
await bus.subscribe( event_interact )
await bus.subscribe( hid.event_interact )

# publish the 1st rendering
assert await bus.publish(event_response,str(hr))
assert await bus.publish(hid.event_response,str(hr))

# register tag.update feature
#======================================
async def update(actions):
try:
r=await bus.publish(event_response+"_update",actions)
r=await bus.publish(hid.event_response_update,actions)
except:
log("!!! concurrent write/read on redys !!!")
r=False
Expand All @@ -115,13 +145,15 @@ async def update(actions):
#======================================

while RRR:
params = await bus.get_event( event_interact )
params = await bus.get_event( hid.event_interact )
if params is not None: # sometimes it's not a dict ?!? (bool ?!)
if params.get("cmd") == CMD_RENDER:
if params.get("cmd") == CMD_EXIT:
break
elif params.get("cmd") == CMD_RENDER:
# just a false start, just need the current render
log("RERENDER")
hr.session = FactorySession(uid) # reload session
assert await bus.publish(event_response,str(hr))
hr.session = FactorySession(hid.uid) # reload session
assert await bus.publish(hid.event_response,str(hr))
else:
log("INTERACT")
#-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- UT
Expand All @@ -133,12 +165,19 @@ async def update(actions):
# always save session after interaction
hr.session._save()

assert await bus.publish(event_response+"_interact",actions)
assert await bus.publish(hid.event_interact_response,actions)

await asyncio.sleep(0.1)

# remove hid in redys "apps"
await bus.srem(KEYAPPS,str(hid))

# delete sesprovider for this hid
await bus.delete(hid.key_sesprovider)


#consume all pending events
assert await bus.unsubscribe( event_interact )
assert await bus.unsubscribe( hid.event_interact )

asyncio.run( loop() )
log("end")
Expand Down Expand Up @@ -169,10 +208,13 @@ def log(*a):

ps={}

def killall(ps:dict):
async def killall(ps:dict):
# try to send a EXIT CMD to all running ps
for hid,infos in ps.items():
ps[hid]["process"].kill()
# remove hid in redys "apps"
await bus.srem(KEYAPPS,hid)


while 1:
params = await bus.get_event( EVENT_SERVER )
Expand All @@ -182,33 +224,37 @@ def killall(ps:dict):
break
elif params.get("cmd") == "CLEAN":
log(EVENT_SERVER, params.get("cmd") )
killall(ps)
continue
elif params.get("cmd") == "PS":
log(EVENT_SERVER, params.get("cmd") )
log(ps)
await killall(ps)
continue

hid=params["hid"]
hid:Hid=params["hid"]
key_init=str(params["init"])

if hid in ps and ps[hid]["process"].is_alive():
#/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
# TODO: this code will be changed soon ;-)
#/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
shid=hid.hid
if shid in ps and ps[shid]["process"].is_alive():
# process is already running

if params["force"] or key_init != ps[hid]["key"]:
if params["force"] or key_init != ps[shid]["key"]:
# kill itself because it's not the same init params, or force recreate
if params["force"]:
log("Destroy/Recreate a new process (forced)",hid)
else:
log("Destroy/Recreate a new process (qp changed)",hid)
ps[hid]["process"].kill()
ps[shid]["process"].kill()

# remove hid in redys "apps"
await bus.srem(KEYAPPS,str(hid))

# and recreate another one later
else:
# it's the same initialization process

log("Reuse process",hid)
# so ask process to send back its render
assert await bus.publish(params["event_interact"],dict(cmd=CMD_RENDER))
assert await bus.publish(hid.event_interact,dict(cmd=CMD_RENDER))
continue
else:
log("Start a new process",hid)
Expand All @@ -218,13 +264,14 @@ def killall(ps:dict):
p.start()

# and save it in pool ps
ps[hid]=dict( process=p, key=key_init, event_interact=params["event_interact"])
ps[shid]=dict( process=p, key=key_init, event_interact=hid.event_interact)
#/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\

await asyncio.sleep(0.1)

assert await bus.unsubscribe( EVENT_SERVER )

killall(ps)
await killall(ps)

log("stopped")

Expand All @@ -238,6 +285,34 @@ async def wait_redys():
pass
await asyncio.sleep(0.1)


##################################################################################
class ServerClient:
##################################################################################
""" to expose server features """
def __init__(self):
self._bus=redys.v2.AClient()

async def list(self) -> list:
""" list all process uid&fqn """
ll = sorted(await self._bus.get(KEYAPPS))
return [Hid(hid) for hid in ll]

async def kill(self,hid:Hid):
""" kill a process (process event)"""
await self._bus.publish(hid.event_interact,dict(cmd=CMD_EXIT))

async def killall(self):
""" killall process (server event)"""
await self._bus.publish(EVENT_SERVER,dict(cmd="CLEAN"))

async def session(self,hid:Hid) -> dict:
""" get session for hid"""
sesprovidername=await self._bus.get(hid.key_sesprovider)
FactorySession=importFactorySession(sesprovidername)
return FactorySession(hid.uid)


async def wait_hrserver():
bus=redys.v2.AClient()
while 1:
Expand All @@ -254,7 +329,9 @@ async def kill_hrserver():
await bus.publish( EVENT_SERVER, dict(cmd=CMD_EXIT) ) # kill orchestrator loop


##################################################################################
async def hrserver():
##################################################################################
s=redys.v2.Server()
s.start()

Expand Down
49 changes: 9 additions & 40 deletions htagweb/server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import uuid,asyncio,time,sys
import redys
import redys.v2
from htagweb.server import EVENT_SERVER
from htagweb.server import EVENT_SERVER,Hid
import logging
logger = logging.getLogger(__name__)

Expand All @@ -20,19 +20,15 @@
class HrClient:
def __init__(self,uid:str,fqn:str,js:str=None,sesprovidername=None,recreate=False):
""" !!!!!!!!!!!!!!!!!!!! if js|sesprovidername is None : can't do a start() !!!!!!!!!!!!!!!!!!!!!!"""
self.uid=uid
self.fqn=fqn
self.js=js
self.bus = redys.v2.AClient()
self.sesprovidername=sesprovidername
self.recreate=recreate

self.hid=f"{uid}_{fqn}"
self.event_response = f"response_{self.hid}"
self.event_interact = f"interact_{self.hid}"
self.hid=Hid.create(uid,fqn)

def error(self, *a):
txt=f".HrClient {self.uid} {self.fqn}: %s" % (" ".join([str(i) for i in a]))
txt=f".HrClient {self.hid.uid} {self.hid.fqn}: %s" % (" ".join([str(i) for i in a]))
print(txt,flush=True,file=sys.stderr)
logger.error(txt)

Expand All @@ -55,62 +51,35 @@ async def start(self,*a,**k) -> str:
assert self.js, "You should define the js in HrPilot() !!!!!!"

# subscribe for response
await self.bus.subscribe( self.event_response )
await self.bus.subscribe( self.hid.event_response )

# start the process app
assert await self.bus.publish( EVENT_SERVER , dict(
uid=self.uid,
hid=self.hid,
event_response=self.event_response,
event_interact=self.event_interact,
fqn=self.fqn,
js=self.js,
init= (a,k),
sesprovidername=self.sesprovidername,
force=self.recreate,
))

# wait 1st rendering
return await self._wait(self.event_response) or "?!"

# async def kill(self):
# """ Kill the process
# (dialog with process event)
# """
# assert await self.bus.publish( self.event_interact, dict(cmd="EXIT") )
return await self._wait(self.hid.event_response) or "?!"


async def interact(self,**params) -> dict:
""" return htag'actions or None (if process doesn't answer, after timeout)
(dialog with process event)
"""
# subscribe for response
await self.bus.subscribe( self.event_response+"_interact" )
await self.bus.subscribe( self.hid.event_interact_response )

# post the interaction
if await self.bus.publish( self.event_interact, params ):
if await self.bus.publish( self.hid.event_interact, params ):
# wait actions
return await self._wait(self.event_response+"_interact") or {}
return await self._wait(self.hid.event_interact_response) or {}
else:
self.error(f"Can't publish {self.event_interact} !!!")



@staticmethod
async def list():
""" SERVER COMMAND
(dialog with server event)
"""
with redys.v2.AClient() as bus:
assert await bus.publish( EVENT_SERVER, dict(cmd="PS") )
self.error(f"Can't publish {self.hid.event_interact} !!!")

@staticmethod
async def clean():
""" SERVER COMMAND
(dialog with server event)
"""
with redys.v2.AClient() as bus:
assert await bus.publish( EVENT_SERVER, dict(cmd="CLEAN") )


async def main():
Expand Down
Loading

0 comments on commit 6f59b93

Please sign in to comment.