| 
 | 1 | +#!/usr/bin/env python  | 
 | 2 | +"""Demo server for demonstrating async handlers.  | 
 | 3 | +
  | 
 | 4 | +   $ python example/asyncio_server.py foo  | 
 | 5 | +
  | 
 | 6 | +"""  | 
 | 7 | + | 
 | 8 | +from __future__ import print_function  | 
 | 9 | + | 
 | 10 | +import time  | 
 | 11 | +import logging  | 
 | 12 | +import argparse  | 
 | 13 | +import asyncio  | 
 | 14 | +from p4p.server import StaticProvider, Server  | 
 | 15 | +from p4p.server.asyncio import SharedPV  | 
 | 16 | +from p4p.nt import NTScalar  | 
 | 17 | + | 
 | 18 | + | 
 | 19 | +DEFAULT_TIMEOUT = 1  | 
 | 20 | + | 
 | 21 | +class SomeClassWithACoroutine:  | 
 | 22 | +    def __init__(self):  | 
 | 23 | +        self.data = None  | 
 | 24 | + | 
 | 25 | +    async def coroutine(self, value: str):  | 
 | 26 | +        logging.info(f"Updating {self} from value {self.data} to {value}.")  | 
 | 27 | + | 
 | 28 | + | 
 | 29 | +class AttrWHandler:  | 
 | 30 | +    def __init__(self, some_object_with_coro: SomeClassWithACoroutine):  | 
 | 31 | +        self.some_object_with_coro = some_object_with_coro  | 
 | 32 | + | 
 | 33 | +    async def put(self, pv, op):  | 
 | 34 | +        raw_value = op.value()  | 
 | 35 | +        logging.info(f"Received put on {raw_value} to {pv.name()}.")  | 
 | 36 | + | 
 | 37 | +        await self.some_object_with_coro.coroutine(raw_value)  | 
 | 38 | + | 
 | 39 | +        pv.post(raw_value, timestamp=time.time())  | 
 | 40 | +        op.done()  | 
 | 41 | + | 
 | 42 | + | 
 | 43 | +class AsyncProviderWrapper:  | 
 | 44 | +    def __init__(self, name: str, loop: asyncio.AbstractEventLoop):  | 
 | 45 | +        self.name = name  | 
 | 46 | +        self._loop = loop  | 
 | 47 | +        self._provider = StaticProvider(name)  | 
 | 48 | +        self._pvs = []  | 
 | 49 | + | 
 | 50 | +        self.setUp()  | 
 | 51 | + | 
 | 52 | +    async def asyncSetUp(self):  | 
 | 53 | +        await self.add_pvs()  | 
 | 54 | + | 
 | 55 | +    async def asyncTearDown(self): ...  | 
 | 56 | + | 
 | 57 | +    async def add_pvs(self):  | 
 | 58 | + | 
 | 59 | +        write_pv = SharedPV(  | 
 | 60 | +            handler=AttrWHandler(SomeClassWithACoroutine()),  | 
 | 61 | +            nt=NTScalar("s"),  | 
 | 62 | +            initial="initial_value_1",  | 
 | 63 | +        )  | 
 | 64 | +        self._pvs.append(write_pv)  | 
 | 65 | +        logging.info(f"Added {self.name}:WRITE_PV to provider.")  | 
 | 66 | +        self._provider.add(f"{self.name}:WRITE_PV", write_pv)  | 
 | 67 | + | 
 | 68 | +        read_pv = SharedPV(  | 
 | 69 | +            nt=NTScalar("s"),  | 
 | 70 | +            initial="initial_value_2",  | 
 | 71 | +        )  | 
 | 72 | +        self._pvs.append(read_pv)  | 
 | 73 | +        logging.info(f"Added {self.name}:READ_PV to provider.")  | 
 | 74 | +        self._provider.add(f"{self.name}:READ_PV", read_pv)  | 
 | 75 | + | 
 | 76 | +    def setUp(self):  | 
 | 77 | +        self._loop.set_debug(True)  | 
 | 78 | +        self._loop.run_until_complete(asyncio.wait_for(self.asyncSetUp(), DEFAULT_TIMEOUT))  | 
 | 79 | + | 
 | 80 | +    def tearDown(self):  | 
 | 81 | +        self._loop.run_until_complete(asyncio.wait_for(self.asyncTearDown(), DEFAULT_TIMEOUT))  | 
 | 82 | + | 
 | 83 | + | 
 | 84 | +class AsyncServerWrapper:  | 
 | 85 | +    def __init__(  | 
 | 86 | +        self,  | 
 | 87 | +        pv_prefix: str,  | 
 | 88 | +    ):  | 
 | 89 | +        self._pv_prefix = pv_prefix  | 
 | 90 | +        self._pvs = []  | 
 | 91 | + | 
 | 92 | +    def run(self):  | 
 | 93 | +        loop = asyncio.new_event_loop()  | 
 | 94 | +        self.provider = AsyncProviderWrapper(self._pv_prefix, loop)  | 
 | 95 | +        try:  | 
 | 96 | +            loop.run_until_complete(self._run())  | 
 | 97 | +        finally:  | 
 | 98 | +            loop.close()  | 
 | 99 | + | 
 | 100 | +    async def _run(self) -> None:  | 
 | 101 | +        logging.info("Running server.")  | 
 | 102 | +        try:  | 
 | 103 | +            Server.forever(providers=[self.provider._provider])  | 
 | 104 | +        finally:  | 
 | 105 | +            print("Server stopped.")  | 
 | 106 | + | 
 | 107 | + | 
 | 108 | +def main(args: argparse.Namespace):  | 
 | 109 | +    AsyncServerWrapper(args.name).run()  | 
 | 110 | + | 
 | 111 | +def getargs() -> argparse.Namespace:  | 
 | 112 | +    P = argparse.ArgumentParser()  | 
 | 113 | +    P.add_argument('prefix', type=str)  | 
 | 114 | +    P.add_argument('-v','--verbose', action='store_const', default=logging.INFO, const=logging.DEBUG)  | 
 | 115 | +    return P.parse_args()  | 
 | 116 | + | 
 | 117 | +if __name__=='__main__':  | 
 | 118 | +    args = getargs()  | 
 | 119 | +    logging.basicConfig(level=args.verbose)  | 
 | 120 | +    main(args)  | 
0 commit comments