diff --git a/example/auditor.py b/example/auditor.py new file mode 100644 index 00000000..72a2d137 --- /dev/null +++ b/example/auditor.py @@ -0,0 +1,109 @@ +""" +In this example we setup a simple auditing mechanism that reports information +about the last audited PV changed (which PV, when, and who by) to an auditor +channel. The record of changes is written to the file `audit.log` file. + +NOTE: This code is an example only. This design is not suitable for non-demo + use. + +The two Handlers below demonstrate many of the methods available to a handler. +The Audited handler records only information about (external) `put()` +operations, not (internal) `post()` operations. The Auditor Handler uses +`open()` to record the start of logging, `post()` to record value changes, +and `close()` to record when auditing finished and write its own value at that +time. + +Run this script and then make a change to one of its PVs, e.g. +`python -m p4p.client.cli put demo:pv:audited_d=8.8`. Then check the results of +the change, e.g. `python -m p4p.client.cli get demo:pv:auditor`. You should see +the name of the PV that was changed, the identity of the user that made the +change, and the time the change was made. + +After making changes you can inspect the log of changes in the "audit.log". +Use of open() and close() means that it's also possible to check when auditing +started and stopped. +""" + +import time + +from p4p import Value +from p4p.nt.scalar import NTScalar +from p4p.server import Server +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class Auditor(Handler): + """Persist information to file so we can audit when the program is closed""" + + def open(self, value: Value): + """Record the time the auditing PV was opened.""" + with open("audit.log", mode="a+", encoding="utf8") as f: + f.write(f"Auditing opened at {time.ctime()}\n") + + def post(self, pv, value: Value): + """Record the time a change was to the auditing PV, and the change made.""" + with open("audit.log", mode="a+", encoding="utf8") as f: + f.write(f"Auditing updated at {time.ctime()}; {value['value']}\n") + + def close(self, pv: Value): + """Record the time the auditing PV was closed.""" + with open("audit.log", mode="a+", encoding="utf8") as f: + value = pv.current().raw["value"] + if value: + f.write(f"Auditing closed at {time.ctime()}; {value}\n") + else: + f.write(f"Auditing closed at {time.ctime()}; no changes made\n") + + +class Audited(Handler): + """Forward information about Put operations to the auditing PV.""" + + def __init__(self, pv: SharedPV): + self._audit_pv = pv + + def put(self, pv, op): + """Each time a Put operation is made we forward some information to the auditing PV.""" + pv.post(op.value()) + self._audit_pv.post( + f"Channel {op.name()} last updated by {op.account()} at {time.ctime()}" + ) + op.done() + + +def main(): + """ + Create a set of text PVs. The audited channels forward information about the user that + made a change and their new values to an auditor channel. + """ + + # Setup some PVs that will be audited and one (`demo:pv:unaudted_i`) that won't be + # Note that the audited handler does have a put so it can also be changed externally + auditor_pv = SharedPV(nt=NTScalar("s"), handler=Auditor(), initial="") + + pvs = { + "demo:pv:auditor": auditor_pv, + "demo:pv:audited_d": SharedPV( + nt=NTScalar("d"), handler=Audited(auditor_pv), initial=9.99 + ), + "demo:pv:audited_i": SharedPV( + nt=NTScalar("i"), handler=Audited(auditor_pv), initial=4 + ), + "demo:pv:audited_s": SharedPV( + nt=NTScalar("s"), handler=Audited(auditor_pv), initial="Testing" + ), + "demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1), + } + + print(pvs.keys()) + try: + Server.forever(providers=[pvs]) + except KeyboardInterrupt: + pass + finally: + # We need to close the auditor PV manually, the server stop() won't do it for us + auditor_pv.close() + + +if __name__ == "__main__": + main() diff --git a/example/persist.py b/example/persist.py new file mode 100644 index 00000000..a00ba977 --- /dev/null +++ b/example/persist.py @@ -0,0 +1,223 @@ +""" +Use a handler to automatically persist values to an SQLite3 file database. +Any values persisted this way will be automatically restored when the +program is rerun. The details of users (account name and IP address) are +recorded for puts. + +NOTE: This is intended as an example only. It has not been tested for + performance, reliability, or robustness. + +Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the +program while continuing to monitor the PV. Compare with the value of +`demo:pv:uptime` which resets on each program start. Try setting the value of +demo:pv:optime while continuing to monitor it. It is recommended to +inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`. + +There is an important caveat for this simple demo: +The `PersistHandler` will not work as expected if anything other than the +value of a field is changed, e.g. if a Control field was added to an NTScalar +if would not be persisted correctly. This could be resolved by correctly +merging the pv.current().raw and value.raw appropriately in the post(). +""" + +import json +import sqlite3 +import time + +from p4p import Value +from p4p.nt.scalar import NTScalar +from p4p.server import Server, ServerOperation +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class PersistHandler(Handler): + """ + A handler that will allow simple persistence of values and timestamps using + SQLite3 across retarts. + + It requires the open handler in order to restore values from a previous run. + It requires a post handler in order to persist values set within the program. + """ + + def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True): + self._conn = conn + self._pv_name = pv_name + self._open_restore = open_restore + + def open(self, value: Value): + """ + Restore values from the previous run, if possible. + If there isn't a previous value then we persist this initial value. + """ + + # If there is a value already in the database we always use that + # instead of the supplied initial value, unless the + # _open_restore flag indicates otherwise. + if not self._open_restore: + return + + # We could, in theory, re-apply authentication here if we queried for + # that information and then did something with it! + res = self._conn.execute("SELECT * FROM pvs WHERE id=?", [self._pv_name]) + query_val = res.fetchone() + + if query_val is not None: + json_val = json.loads(query_val[1]) + + # Report on restored values and setter (reports None if not known) + print( + f"Will restore to {self._pv_name} value: {json_val['value']}, set by {query_val[2]}" + ) + + # Override initial value + value["value"] = json_val["value"] + + # Override current timestamp (if any) with time the new value was set + value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][ + "secondsPastEpoch" + ] + value["timeStamp.nanoseconds"] = json_val["timeStamp"]["nanoseconds"] + else: + # We are using an initial value so persist it + self._upsert(value) + + def post(self, pv: SharedPV, value: Value): + """Update timestamp of the PV and store its new value.""" + self._update_timestamp(value) + + self._upsert(value) + + def put(self, pv: SharedPV, op: ServerOperation): + """ + Update timestamp of the PV and store its new value. Unlike the post() + we include the account and peer update in the data stored. + """ + value = op.value().raw + + self._update_timestamp(value) + self._upsert(value, op.account(), op.peer()) + + op.done() + + def _update_timestamp(self, value: Value) -> None: + """Update the timestamp of the PV to the current time.""" + + if not value.changed("timeStamp") or ( + value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0 + ): + now = time.time() + value["timeStamp.secondsPastEpoch"] = now // 1 + value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) + + def _upsert( + self, value: Value, account: str | None = None, peer: str | None = None + ) -> None: + # Persist the data; turn into JSON and write it to the DB + val_json = json.dumps(value.todict()) + + # Use UPSERT: https://sqlite.org/lang_upsert.html + self._conn.execute( + """ + INSERT INTO pvs (id, data, account, peer) + VALUES (:name, :json_data, :account, :peer) + ON CONFLICT(id) + DO UPDATE SET data = :json_data, account = :account, peer = :peer; + """, + { + "name": self._pv_name, + "json_data": val_json, + "account": account, + "peer": peer, + }, + ) + self._conn.commit() + + +def main() -> None: + """ + Create SQLite3 database and a set of PVs to demonstrate persistance + using a p4p Handler. + """ + + # Create an SQLite dayabase to function as our persistence store + conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) + # conn.execute("DROP TABLE IF EXISTS pvs") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS pvs + (id VARCHAR(255), + data JSON, + account VARCHAR(30), + peer VARCHAR(55), + PRIMARY KEY (id) + ) + ; + """ + ) # IPv6 addresses can be long and will contain port number as well! + + # Create the example PVs. + # Note that we switch off restores for `demo:pv:uptime`. If we simply + # didn't use the PersistHandler the timestamp of the PV would not be + # updated. + duplicate_pv = SharedPV( + nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12 + ) + pvs = { + "demo:pv:optime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + initial=0, + ), # Operational time; total time running + "demo:pv:uptime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:uptime", conn, open_restore=False), + timestamp=time.time(), + initial=0, + ), # Uptime since most recent (re)start + "demo:pv:int": duplicate_pv, + "demo:pv:float": SharedPV( + nt=NTScalar("d"), + handler=PersistHandler("demo:pv:float", conn), + initial=9.99, + ), + "demo:pv:string": SharedPV( + nt=NTScalar("s"), + handler=PersistHandler("demo:pv:string", conn), + initial="Hello!", + ), + "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore + } + + # Make the uptime PV readonly; maybe we want to be able to update optime + # after major system upgrades? + uptime_pv = pvs["demo:pv:uptime"] + + @uptime_pv.put + def read_only(_pv: SharedPV, op: ServerOperation): + """Don't allow put operations on this PV.""" + + op.done(error="Read-only") + + print(f"Starting server with the following PVs: {pvs}") + + server = None + try: + server = Server(providers=[pvs]) + while True: + # Every second increment the values of uptime and optime + time.sleep(1) + increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 + pvs["demo:pv:uptime"].post(increment_value) + increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1 + pvs["demo:pv:optime"].post(increment_value) + except KeyboardInterrupt: + pass + finally: + if server: + server.stop() + conn.close() + + +if __name__ == "__main__": + main() diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index 2629f780..034ed0bf 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -39,6 +39,13 @@ class Handler(object): Use of this as a base class is optional. """ + def open(self, value): + """ + Called each time an Open operation is performed on this Channel + + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + """ + pass def put(self, pv, op): """ @@ -50,6 +57,17 @@ def put(self, pv, op): """ op.done(error='Not supported') + def post(self, pv, value): + """ + Called each time a client issues a post + operation on this Channel. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + :param dict options: A dictionary of configuration options. + """ + pass + def rpc(self, pv, op): """ Called each time a client issues a Remote Procedure Call @@ -77,6 +95,14 @@ def onLastDisconnect(self, pv): pass + def close(self, pv): + """ + Called when the Channel is closed. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + """ + pass + class SharedPV(_SharedPV): """Shared state Process Variable. Callback based implementation. @@ -158,6 +184,16 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + open_fn = self._handler.open + except AttributeError: + pass + else: + open_fn(V) + _SharedPV.open(self, V) def post(self, value, **kws): @@ -174,8 +210,37 @@ def post(self, value, **kws): V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + post_fn = self._handler.post + except AttributeError: + pass + else: + post_fn(self, V) + _SharedPV.post(self, V) + def close(self, destroy=False): + """Close PV, disconnecting any clients. + + :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). + + close() with destory=True or sync=True will not prevent clients from re-connecting. + New clients may prevent sync=True from succeeding. + Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`, + or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. + """ + try: + close_fn = self._handler.close + except AttributeError: + pass + else: + close_fn(self) + + _SharedPV.close(self) + def current(self): V = _SharedPV.current(self) try: @@ -208,6 +273,13 @@ def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real + def open(self, value): + _log.debug('OPEN %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.open, value) + except AttributeError: + pass + def onFirstConnect(self): self._pv._exec(None, self._pv._onFirstConnect, None) try: # user handler may omit onFirstConnect() @@ -239,6 +311,20 @@ def rpc(self, op): except AttributeError: op.done(error="RPC not supported") + def post(self, value): + _log.debug('POST %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.post, self._pv, value) + except AttributeError: + pass + + def close(self): + _log.debug('CLOSE %s', self._pv) + try: + self._pv._exec(None, self._real.close, self._pv) + except AttributeError: + pass + @property def onFirstConnect(self): def decorate(fn): @@ -253,6 +339,20 @@ def decorate(fn): return fn return decorate + @property + def on_open(self): + def decorate(fn): + self._handler.open = fn + return fn + return decorate + + @property + def on_post(self): + def decorate(fn): + self._handler.post = fn + return fn + return decorate + @property def put(self): def decorate(fn): @@ -267,6 +367,13 @@ def decorate(fn): return fn return decorate + @property + def on_close(self): + def decorate(fn): + self._handler.close = fn + return fn + return decorate + def __repr__(self): if self.isOpen(): return '%s(value=%s)' % (self.__class__.__name__, repr(self.current())) diff --git a/src/p4p/test/test_sharedpv.py b/src/p4p/test/test_sharedpv.py index a810ba32..dbb05258 100644 --- a/src/p4p/test/test_sharedpv.py +++ b/src/p4p/test/test_sharedpv.py @@ -17,7 +17,7 @@ from ..wrapper import Value, Type from ..client.thread import Context, Disconnected, TimeoutError, RemoteError from ..server import Server, StaticProvider -from ..server.thread import SharedPV, _defaultWorkQueue +from ..server.thread import Handler, SharedPV, _defaultWorkQueue from ..util import WorkQueue from ..nt import NTScalar, NTURI from .utils import RefTestCase @@ -420,3 +420,55 @@ def testPVClose(self): _log.debug('CLOSE') self.assertFalse(self.H.conn) + +class TestHandlerOpenPostClose(RefTestCase): + """ + Test Handler open(), post() and close() functions called correctly by SharedPV. Note that: + - TestRPC, TestFirstLast already test onFirstConnect() and onLastDisconnect(). + - TestGPM, TestPVRequestMask already test put(). + - TestRPC, TestRPC2 already test rpc(). + """ + + class TestHandler(Handler): + def __init__(self): + self.last_op = "init" + + def open(self, value): + self.last_op = "open" + value["value"] = 17 + + def post(self, pv, value): + self.last_op = "post" + value["value"] = value["value"] * 2 + + def close(self, pv): + self.last_op = "close" + + def setUp(self): + super(TestHandlerOpenPostClose, self).setUp() + self.handler = self.TestHandler() + self.pv = SharedPV(handler=self.handler, nt=NTScalar('d')) + + def test_open(self): + # Setup sets the initial value to 5, but the Handler open() overrides + self.pv.open(5) + self.assertEqual(self.handler.last_op, "open") + self.assertEqual(self.pv.current(), 17.0) + + def test_post(self): + self.pv.open(5) + self.pv.post(13.0) + self.assertEqual(self.handler.last_op, "post") + self.assertEqual(self.pv.current(), 26.0) + + def test_close(self): + self.pv.open(5) + self.pv.close(sync=True) + self.assertEqual(self.handler.last_op, "close") + + def tearDown(self): + self.pv.close(sync=True) + self.traceme(self.pv) + del self.pv + + super(TestHandlerOpenPostClose, self).tearDown()