From 58e5c653ffe9eba3cc7b9caa69e6d952e44d3a17 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Tue, 27 May 2025 22:14:14 +0100 Subject: [PATCH 1/9] New Handler and SharedPV methods with example --- example/persist.py | 187 ++++++++++++++++++++++++++++++++++++++++++ src/p4p/server/raw.py | 103 +++++++++++++++++++++++ 2 files changed, 290 insertions(+) create mode 100644 example/persist.py diff --git a/example/persist.py b/example/persist.py new file mode 100644 index 00000000..2607936f --- /dev/null +++ b/example/persist.py @@ -0,0 +1,187 @@ +""" +Use a handler to automatically persist values to an SQLite3 file database. +Any values persisted this way will be automatically restored when the +program is rerun. The details of users (account name and IP address) are +recorded for puts. + +Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the +program while continuing to monitor the PV. Compare with the value of +`demo:pv:uptime` which resets on each program start. Try setting the value of +demo:pv:optime while continuing to monitor it. It is recommended to +inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`. + +There is an important caveat for this simple demo: +The `PersistHandler` will not work as expected if anything other than the +value of a field is changed, e.g. if a Control field was added to an NTScalar +if would not be persisted correctly. This could be resolved by correctly +merging the pv.current().raw and value.raw appropriately in the post(). +""" + +import json +import sqlite3 +import time + +from p4p import Value +from p4p.nt.scalar import NTScalar +from p4p.server import Server, ServerOperation +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class PersistHandler(Handler): + """ + A handler that will allow simple persistence of values and timestamps + across retarts. It requires a post handler in order to persist values + set within the program. + """ + + def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True): + self._conn = conn + self._pv_name = pv_name + self._open_restore = open_restore + + def open(self, value, **kws): + # If there is a value already in the database we always use that + # instead of the supplied initial value, unless the + # handler_open_restore flag indicates otherwise. + if not self._open_restore: + return + + # We could, in theory, re-apply authentication here if we queried for + # that information and then did something with it! + res = self._conn.execute("SELECT data FROM pvs WHERE id=?", [self._pv_name]) + query_val = res.fetchone() + + if query_val is not None: + json_val = json.loads(query_val[0]) + print(f"Will restore to {self._pv_name} value: {json_val['value']}") + + # Override initial value + value["value"] = json_val["value"] + + value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][ + "secondsPastEpoch" + ] + value["timeStamp.nanoseconds"] = json_val["timeStamp"]["nanoseconds"] + else: + # We are using an initial value so persist it + self._upsert(value) + + def post( + self, + pv: SharedPV, + value: Value, + ): + self._update_timestamp(value) + + self._upsert( + value, + ) + + def put(self, pv: SharedPV, op: ServerOperation): + # The post does all the real work, we just add info only available + # from the ServerOperation + self._update_timestamp(op.value()) + + self._upsert( + op.value(), op.account(), op.peer() + ) + + op.done() + + def _update_timestamp(self, value) -> None: + if not value.changed("timeStamp") or ( + value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0 + ): + now = time.time() + value["timeStamp.secondsPastEpoch"] = now // 1 + value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) + + def _upsert(self, value, account=None, peer=None) -> None: + # Persist the data; turn into JSON and write it to the DB + val_json = json.dumps(value.todict()) + + # Use UPSERT: https://sqlite.org/lang_upsert.html + conn.execute( + """ + INSERT INTO pvs (id, data, account, peer) + VALUES (:name, :json_data, :account, :peer) + ON CONFLICT(id) + DO UPDATE SET data = :json_data, account = :account, peer = :peer; + """, + { + "name": self._pv_name, + "json_data": val_json, + "account": account, + "peer": peer, + }, + ) + conn.commit() + + +# Create an SQLite dayabase to function as our persistence store +conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) +#conn.execute("DROP TABLE IF EXISTS pvs") +conn.execute( + "CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));" +) # IPv6 addresses can be long and will contain port number as well! + +duplicate_pv = SharedPV( + nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12 +) +pvs = { + "demo:pv:optime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + initial=0, + ), # Operational time; total time running + "demo:pv:uptime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:uptime", conn, open_restore=False), + timestamp=time.time(), + initial=0, + ), # Uptime since most recent (re)start + "demo:pv:int": duplicate_pv, + "demo:pv:float": SharedPV( + nt=NTScalar("d"), + handler=PersistHandler("demo:pv:float", conn), + initial=9.99, + ), + "demo:pv:string": SharedPV( + nt=NTScalar("s"), + handler=PersistHandler("demo:pv:string", conn), + initial="Hello!", + ), + "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore +} + + +# Make the uptime PV readonly; maybe we want to be able to update optime +# after major system upgrades? +uptime_pv = pvs["demo:pv:uptime"] + + +@uptime_pv.put +def read_only(pv: SharedPV, op: ServerOperation): + op.done(error="Read-only") + return + + +print(f"Starting server with the following PVs: {pvs}") + +server = None +try: + server = Server(providers=[pvs]) + while True: + # Every second increment the values of uptime and optime + time.sleep(1) + increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 + pvs["demo:pv:uptime"].post(increment_value) + increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1 + pvs["demo:pv:optime"].post(increment_value) +except KeyboardInterrupt: + pass +finally: + if server: + server.stop() + conn.close() diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index 2629f780..0bd0c296 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -39,6 +39,13 @@ class Handler(object): Use of this as a base class is optional. """ + def open(self, value, **kws): + """ + Called each time an Open operation is performed on this Channel + + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + """ + pass def put(self, pv, op): """ @@ -50,6 +57,17 @@ def put(self, pv, op): """ op.done(error='Not supported') + def post(self, pv, value, **kws): + """ + Called each time a client issues a post + operation on this Channel. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). + :param dict options: A dictionary of configuration options. + """ + pass + def rpc(self, pv, op): """ Called each time a client issues a Remote Procedure Call @@ -77,6 +95,14 @@ def onLastDisconnect(self, pv): pass + def close(self, pv): + """ + Called when the Channel is closed. + + :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. + """ + pass + class SharedPV(_SharedPV): """Shared state Process Variable. Callback based implementation. @@ -158,6 +184,14 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.open(V, **kws) + except AttributeError as err: + pass + _SharedPV.open(self, V) def post(self, value, **kws): @@ -174,8 +208,35 @@ def post(self, value, **kws): V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) + + # Guard goes here because we can have handlers that don't inherit from + # the Handler base class + try: + self._handler.post(self, V, **kws) + except AttributeError: + pass + _SharedPV.post(self, V) + def close(self, destroy=False, sync=False, timeout=None): + """Close PV, disconnecting any clients. + + :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). + :param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies). + :param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value. + + close() with destory=True or sync=True will not prevent clients from re-connecting. + New clients may prevent sync=True from succeeding. + Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`, + or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. + """ + try: + self._handler.close(self) + except AttributeError: + pass + + _SharedPV.close(self) + def current(self): V = _SharedPV.current(self) try: @@ -208,6 +269,13 @@ def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real + def open(self, value, **kws): + _log.debug('OPEN %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.open, value, **kws) + except AttributeError: + pass + def onFirstConnect(self): self._pv._exec(None, self._pv._onFirstConnect, None) try: # user handler may omit onFirstConnect() @@ -239,6 +307,20 @@ def rpc(self, op): except AttributeError: op.done(error="RPC not supported") + def post(self, value, **kws): + _log.debug('POST %s %s', self._pv, value) + try: + self._pv._exec(None, self._real.post, self._pv, value, **kws) + except AttributeError: + pass + + def close(self, pv): + _log.debug('CLOSE %s', self._pv) + try: + self._pv._exec(None, self._real.close, self._pv) + except AttributeError: + pass + @property def onFirstConnect(self): def decorate(fn): @@ -253,6 +335,20 @@ def decorate(fn): return fn return decorate + @property + def on_open(self): + def decorate(fn): + self._handler.open = fn + return fn + return decorate + + @property + def on_post(self): + def decorate(fn): + self._handler.post = fn + return fn + return decorate + @property def put(self): def decorate(fn): @@ -267,6 +363,13 @@ def decorate(fn): return fn return decorate + @property + def on_close(self): + def decorate(fn): + self._handler.close = fn + return fn + return decorate + def __repr__(self): if self.isOpen(): return '%s(value=%s)' % (self.__class__.__name__, repr(self.current())) From cfe99a10d3b08f59d65511ba714b1138f564f88a Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Wed, 28 May 2025 09:36:16 +0100 Subject: [PATCH 2/9] Small improvements to structure and commenting of example code --- example/persist.py | 194 +++++++++++++++++++++++++-------------------- 1 file changed, 106 insertions(+), 88 deletions(-) diff --git a/example/persist.py b/example/persist.py index 2607936f..d3c2d4cd 100644 --- a/example/persist.py +++ b/example/persist.py @@ -30,9 +30,11 @@ class PersistHandler(Handler): """ - A handler that will allow simple persistence of values and timestamps - across retarts. It requires a post handler in order to persist values - set within the program. + A handler that will allow simple persistence of values and timestamps using + SQLite3 across retarts. + + It requires the open handler in order to restore values from a previous run. + It requires a post handler in order to persist values set within the program. """ def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True): @@ -40,10 +42,15 @@ def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True): self._pv_name = pv_name self._open_restore = open_restore - def open(self, value, **kws): + def open(self, value: Value, **_kws): + """ + Restore values from the previous run, if possible. + If there isn't a previous value then we persist this initial value. + """ + # If there is a value already in the database we always use that # instead of the supplied initial value, unless the - # handler_open_restore flag indicates otherwise. + # _open_restore flag indicates otherwise. if not self._open_restore: return @@ -59,6 +66,7 @@ def open(self, value, **kws): # Override initial value value["value"] = json_val["value"] + # Override current timestamp (if any) with time the new value was set value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][ "secondsPastEpoch" ] @@ -67,29 +75,27 @@ def open(self, value, **kws): # We are using an initial value so persist it self._upsert(value) - def post( - self, - pv: SharedPV, - value: Value, - ): + def post(self, pv: SharedPV, value: Value, **_kws): + """Update timestamp of the PV and store its new value.""" self._update_timestamp(value) - self._upsert( - value, - ) + self._upsert(value) def put(self, pv: SharedPV, op: ServerOperation): - # The post does all the real work, we just add info only available - # from the ServerOperation + """ + Update timestamp of the PV and store its new value. Unlike the post() + we include the account and peer update in the data stored. + """ + self._update_timestamp(op.value()) - self._upsert( - op.value(), op.account(), op.peer() - ) + self._upsert(op.value(), op.account(), op.peer()) op.done() def _update_timestamp(self, value) -> None: + """Update the timestamp of the PV to the current time.""" + if not value.changed("timeStamp") or ( value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0 ): @@ -102,7 +108,7 @@ def _upsert(self, value, account=None, peer=None) -> None: val_json = json.dumps(value.todict()) # Use UPSERT: https://sqlite.org/lang_upsert.html - conn.execute( + self._conn.execute( """ INSERT INTO pvs (id, data, account, peer) VALUES (:name, :json_data, :account, :peer) @@ -116,72 +122,84 @@ def _upsert(self, value, account=None, peer=None) -> None: "peer": peer, }, ) - conn.commit() - - -# Create an SQLite dayabase to function as our persistence store -conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) -#conn.execute("DROP TABLE IF EXISTS pvs") -conn.execute( - "CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));" -) # IPv6 addresses can be long and will contain port number as well! - -duplicate_pv = SharedPV( - nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12 -) -pvs = { - "demo:pv:optime": SharedPV( - nt=NTScalar("i"), - handler=PersistHandler("demo:pv:optime", conn), - initial=0, - ), # Operational time; total time running - "demo:pv:uptime": SharedPV( - nt=NTScalar("i"), - handler=PersistHandler("demo:pv:uptime", conn, open_restore=False), - timestamp=time.time(), - initial=0, - ), # Uptime since most recent (re)start - "demo:pv:int": duplicate_pv, - "demo:pv:float": SharedPV( - nt=NTScalar("d"), - handler=PersistHandler("demo:pv:float", conn), - initial=9.99, - ), - "demo:pv:string": SharedPV( - nt=NTScalar("s"), - handler=PersistHandler("demo:pv:string", conn), - initial="Hello!", - ), - "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore -} - - -# Make the uptime PV readonly; maybe we want to be able to update optime -# after major system upgrades? -uptime_pv = pvs["demo:pv:uptime"] - - -@uptime_pv.put -def read_only(pv: SharedPV, op: ServerOperation): - op.done(error="Read-only") - return - - -print(f"Starting server with the following PVs: {pvs}") - -server = None -try: - server = Server(providers=[pvs]) - while True: - # Every second increment the values of uptime and optime - time.sleep(1) - increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 - pvs["demo:pv:uptime"].post(increment_value) - increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1 - pvs["demo:pv:optime"].post(increment_value) -except KeyboardInterrupt: - pass -finally: - if server: - server.stop() - conn.close() + self._conn.commit() + + +def main() -> None: + """ + Create SQLite3 database and a set of PVs to demonstrate persistance + using a p4p Handler. + """ + + # Create an SQLite dayabase to function as our persistence store + conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) + # conn.execute("DROP TABLE IF EXISTS pvs") + conn.execute( + "CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));" + ) # IPv6 addresses can be long and will contain port number as well! + + # Create the example PVs. + # Note that we switch off restores for `demo:pv:uptime`. If we simply + # didn't use the PersistHandler the timestamp of the PV would not be + # updated. + duplicate_pv = SharedPV( + nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12 + ) + pvs = { + "demo:pv:optime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:optime", conn), + initial=0, + ), # Operational time; total time running + "demo:pv:uptime": SharedPV( + nt=NTScalar("i"), + handler=PersistHandler("demo:pv:uptime", conn, open_restore=False), + timestamp=time.time(), + initial=0, + ), # Uptime since most recent (re)start + "demo:pv:int": duplicate_pv, + "demo:pv:float": SharedPV( + nt=NTScalar("d"), + handler=PersistHandler("demo:pv:float", conn), + initial=9.99, + ), + "demo:pv:string": SharedPV( + nt=NTScalar("s"), + handler=PersistHandler("demo:pv:string", conn), + initial="Hello!", + ), + "demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore + } + + # Make the uptime PV readonly; maybe we want to be able to update optime + # after major system upgrades? + uptime_pv = pvs["demo:pv:uptime"] + + @uptime_pv.put + def read_only(_pv: SharedPV, op: ServerOperation): + """Don't allow put operations on this PV.""" + + op.done(error="Read-only") + + print(f"Starting server with the following PVs: {pvs}") + + server = None + try: + server = Server(providers=[pvs]) + while True: + # Every second increment the values of uptime and optime + time.sleep(1) + increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1 + pvs["demo:pv:uptime"].post(increment_value) + increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1 + pvs["demo:pv:optime"].post(increment_value) + except KeyboardInterrupt: + pass + finally: + if server: + server.stop() + conn.close() + + +if __name__ == "__main__": + main() From e6c0df063b9931ecd81e520e24eead4e87ecb0d4 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 29 May 2025 14:13:43 +0100 Subject: [PATCH 3/9] Implemented changes suggested in #172 --- example/persist.py | 3 +++ src/p4p/server/raw.py | 22 +++++++++++++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/example/persist.py b/example/persist.py index d3c2d4cd..73fec92c 100644 --- a/example/persist.py +++ b/example/persist.py @@ -4,6 +4,9 @@ program is rerun. The details of users (account name and IP address) are recorded for puts. +NOTE: This is intended as an example only. It has not been tested for + performance, reliability, or robustness. + Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the program while continuing to monitor the PV. Compare with the value of `demo:pv:uptime` which resets on each program start. Try setting the value of diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index 0bd0c296..dcb9d75c 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -95,7 +95,7 @@ def onLastDisconnect(self, pv): pass - def close(self, pv): + def close(self): """ Called when the Channel is closed. @@ -188,9 +188,11 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): # Guard goes here because we can have handlers that don't inherit from # the Handler base class try: - self._handler.open(V, **kws) - except AttributeError as err: + open_fn = self._handler.open + except AttributeError: pass + else: + open_fn(V, **kws) _SharedPV.open(self, V) @@ -212,28 +214,30 @@ def post(self, value, **kws): # Guard goes here because we can have handlers that don't inherit from # the Handler base class try: - self._handler.post(self, V, **kws) + post_fn = self._handler.post except AttributeError: pass + else: + post_fn(self, V, **kws) _SharedPV.post(self, V) - def close(self, destroy=False, sync=False, timeout=None): + def close(self, destroy=False): """Close PV, disconnecting any clients. :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). - :param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies). - :param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value. close() with destory=True or sync=True will not prevent clients from re-connecting. New clients may prevent sync=True from succeeding. Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`, or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. """ - try: - self._handler.close(self) + try: + close_fn = self._handler.close except AttributeError: pass + else: + close_fn(self) _SharedPV.close(self) From 6cb32f4d93ce307fc0e2173dc00b14de2fee1051 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 29 May 2025 14:13:56 +0100 Subject: [PATCH 4/9] Added new example for #172 --- example/auditor.py | 109 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 example/auditor.py diff --git a/example/auditor.py b/example/auditor.py new file mode 100644 index 00000000..d2bc4693 --- /dev/null +++ b/example/auditor.py @@ -0,0 +1,109 @@ +""" +In this example we setup a simple auditing mechanism that reports information +about the last audited PV changed (which PV, when, and who by) to an auditor +channel. The record of changes is written to the file `audit.log` file. + +NOTE: This code is an example only. This design is not suitable for non-demo + use. + +The two Handlers below demonstrate many of the methods available to a handler. +The Audited handler records only information about (external) `put()` +operations, not (internal) `post()` operations. The Auditor Handler uses +`open()` to record the start of logging, `post()` to record value changes, +and `close()` to record when auditing finished and write its own value at that +time. + +Run this script and then make a change to one of its PVs, e.g. +`python -m p4p.client.cli put demo:pv:audited_d=8.8`. Then check the results of +the change, e.g. `python -m p4p.client.cli get demo:pv:auditor`. You should see +the name of the PV that was changed, the identity of the user that made the +change, and the time the change was made. + +After making changes you can inspect the log of changes in the "audit.log". +Use of open() and close() means that it's also possible to check when auditing +started and stopped. +""" + +import time + +from p4p import Value +from p4p.nt.scalar import NTScalar +from p4p.server import Server +from p4p.server.raw import Handler +from p4p.server.thread import SharedPV + + +class Auditor(Handler): + """Persist information to file so we can audit when the program is closed""" + + def open(self, value: Value, **_kws): + """Record the time the auditing PV was opened.""" + with open("audit.log", mode="a+", encoding="utf8") as f: + f.write(f"Auditing opened at {time.ctime()}\n") + + def post(self, pv, value: Value, **_kws): + """Record the time a change was to the auditing PV, and the change made.""" + with open("audit.log", mode="a+", encoding="utf8") as f: + f.write(f"Auditing updated at {time.ctime()}; {value['value']}\n") + + def close(self, pv: Value): + """Record the time the auditing PV was closed.""" + with open("audit.log", mode="a+", encoding="utf8") as f: + value = pv.current().raw["value"] + if value: + f.write(f"Auditing closed at {time.ctime()}; {value}\n") + else: + f.write(f"Auditing closed at {time.ctime()}; no changes made\n") + + +class Audited(Handler): + """Forward information about Put operations to the auditing PV.""" + + def __init__(self, pv: SharedPV): + self._audit_pv = pv + + def put(self, pv, op): + """Each time a Put operation is made we forward some information to the auditing PV.""" + pv.post(op.value()) + self._audit_pv.post( + f"Channel {op.name()} last updated by {op.account()} at {time.ctime()}" + ) + op.done() + + +def main(): + """ + Create a set of text PVs. The audited channels forward information about the user that + made a change and their new values to an auditor channel. + """ + + # Setup some PVs that will be audited and one (`demo:pv:unaudted_i`) that won't be + # Note that the audited handler does have a put so it can also be changed externally + auditor_pv = SharedPV(nt=NTScalar("s"), handler=Auditor(), initial="") + + pvs = { + "demo:pv:auditor": auditor_pv, + "demo:pv:audited_d": SharedPV( + nt=NTScalar("d"), handler=Audited(auditor_pv), initial=9.99 + ), + "demo:pv:audited_i": SharedPV( + nt=NTScalar("i"), handler=Audited(auditor_pv), initial=4 + ), + "demo:pv:audited_s": SharedPV( + nt=NTScalar("s"), handler=Audited(auditor_pv), initial="Testing" + ), + "demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1), + } + + print(pvs.keys()) + try: + Server.forever(providers=[pvs]) + except KeyboardInterrupt: + pass + finally: + # We need to close the auditor PV manually, the server stop() won't do it for us + auditor_pv.close() + + +if __name__ == "__main__": + main() From c57723da47df764e0640f7be260824af141564bb Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 29 May 2025 14:40:34 +0100 Subject: [PATCH 5/9] Remove unnecessary kwargs --- src/p4p/server/raw.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/p4p/server/raw.py b/src/p4p/server/raw.py index dcb9d75c..034ed0bf 100644 --- a/src/p4p/server/raw.py +++ b/src/p4p/server/raw.py @@ -39,7 +39,7 @@ class Handler(object): Use of this as a base class is optional. """ - def open(self, value, **kws): + def open(self, value): """ Called each time an Open operation is performed on this Channel @@ -57,7 +57,7 @@ def put(self, pv, op): """ op.done(error='Not supported') - def post(self, pv, value, **kws): + def post(self, pv, value): """ Called each time a client issues a post operation on this Channel. @@ -95,7 +95,7 @@ def onLastDisconnect(self, pv): pass - def close(self): + def close(self, pv): """ Called when the Channel is closed. @@ -192,7 +192,7 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws): except AttributeError: pass else: - open_fn(V, **kws) + open_fn(V) _SharedPV.open(self, V) @@ -218,7 +218,7 @@ def post(self, value, **kws): except AttributeError: pass else: - post_fn(self, V, **kws) + post_fn(self, V) _SharedPV.post(self, V) @@ -273,10 +273,10 @@ def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real - def open(self, value, **kws): + def open(self, value): _log.debug('OPEN %s %s', self._pv, value) try: - self._pv._exec(None, self._real.open, value, **kws) + self._pv._exec(None, self._real.open, value) except AttributeError: pass @@ -311,14 +311,14 @@ def rpc(self, op): except AttributeError: op.done(error="RPC not supported") - def post(self, value, **kws): + def post(self, value): _log.debug('POST %s %s', self._pv, value) try: - self._pv._exec(None, self._real.post, self._pv, value, **kws) + self._pv._exec(None, self._real.post, self._pv, value) except AttributeError: pass - def close(self, pv): + def close(self): _log.debug('CLOSE %s', self._pv) try: self._pv._exec(None, self._real.close, self._pv) From 2e9e6bc577bc1d500af4053ffdd695b230f192af Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 29 May 2025 14:41:20 +0100 Subject: [PATCH 6/9] Fix examples based on previous commits, fix persist.put --- example/auditor.py | 4 ++-- example/persist.py | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/example/auditor.py b/example/auditor.py index d2bc4693..72a2d137 100644 --- a/example/auditor.py +++ b/example/auditor.py @@ -36,12 +36,12 @@ class Auditor(Handler): """Persist information to file so we can audit when the program is closed""" - def open(self, value: Value, **_kws): + def open(self, value: Value): """Record the time the auditing PV was opened.""" with open("audit.log", mode="a+", encoding="utf8") as f: f.write(f"Auditing opened at {time.ctime()}\n") - def post(self, pv, value: Value, **_kws): + def post(self, pv, value: Value): """Record the time a change was to the auditing PV, and the change made.""" with open("audit.log", mode="a+", encoding="utf8") as f: f.write(f"Auditing updated at {time.ctime()}; {value['value']}\n") diff --git a/example/persist.py b/example/persist.py index 73fec92c..e2fd1742 100644 --- a/example/persist.py +++ b/example/persist.py @@ -45,7 +45,7 @@ def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True): self._pv_name = pv_name self._open_restore = open_restore - def open(self, value: Value, **_kws): + def open(self, value: Value): """ Restore values from the previous run, if possible. If there isn't a previous value then we persist this initial value. @@ -78,7 +78,7 @@ def open(self, value: Value, **_kws): # We are using an initial value so persist it self._upsert(value) - def post(self, pv: SharedPV, value: Value, **_kws): + def post(self, pv: SharedPV, value: Value): """Update timestamp of the PV and store its new value.""" self._update_timestamp(value) @@ -89,10 +89,7 @@ def put(self, pv: SharedPV, op: ServerOperation): Update timestamp of the PV and store its new value. Unlike the post() we include the account and peer update in the data stored. """ - - self._update_timestamp(op.value()) - - self._upsert(op.value(), op.account(), op.peer()) + pv.post(op.value()) # timestamp and upsert handled by post() above op.done() From 09b11bab559d9d5c09ac1f32e3b3a812fd45196d Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 29 May 2025 16:54:26 +0100 Subject: [PATCH 7/9] Improve formating and status messages --- example/persist.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/example/persist.py b/example/persist.py index e2fd1742..6ae7b075 100644 --- a/example/persist.py +++ b/example/persist.py @@ -59,12 +59,16 @@ def open(self, value: Value): # We could, in theory, re-apply authentication here if we queried for # that information and then did something with it! - res = self._conn.execute("SELECT data FROM pvs WHERE id=?", [self._pv_name]) + res = self._conn.execute("SELECT * FROM pvs WHERE id=?", [self._pv_name]) query_val = res.fetchone() if query_val is not None: - json_val = json.loads(query_val[0]) - print(f"Will restore to {self._pv_name} value: {json_val['value']}") + json_val = json.loads(query_val[1]) + + # Report on restored values and setter (reports None if not known) + print( + f"Will restore to {self._pv_name} value: {json_val['value']}, set by {query_val[2]}" + ) # Override initial value value["value"] = json_val["value"] @@ -93,7 +97,7 @@ def put(self, pv: SharedPV, op: ServerOperation): op.done() - def _update_timestamp(self, value) -> None: + def _update_timestamp(self, value: Value) -> None: """Update the timestamp of the PV to the current time.""" if not value.changed("timeStamp") or ( @@ -103,7 +107,9 @@ def _update_timestamp(self, value) -> None: value["timeStamp.secondsPastEpoch"] = now // 1 value["timeStamp.nanoseconds"] = int((now % 1) * 1e9) - def _upsert(self, value, account=None, peer=None) -> None: + def _upsert( + self, value: Value, account: str | None = None, peer: str | None = None + ) -> None: # Persist the data; turn into JSON and write it to the DB val_json = json.dumps(value.todict()) @@ -135,7 +141,16 @@ def main() -> None: conn = sqlite3.connect("persist_pvs.db", check_same_thread=False) # conn.execute("DROP TABLE IF EXISTS pvs") conn.execute( - "CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));" + """ + CREATE TABLE IF NOT EXISTS pvs + (id VARCHAR(255), + data JSON, + account VARCHAR(30), + peer VARCHAR(55), + PRIMARY KEY (id) + ) + ; + """ ) # IPv6 addresses can be long and will contain port number as well! # Create the example PVs. From 0601bcfd2c6a84927579a6be8ef1355bffe5bd95 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Thu, 29 May 2025 16:55:07 +0100 Subject: [PATCH 8/9] Fix put so that account and peer info is recorded --- example/persist.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/example/persist.py b/example/persist.py index 6ae7b075..a00ba977 100644 --- a/example/persist.py +++ b/example/persist.py @@ -93,7 +93,10 @@ def put(self, pv: SharedPV, op: ServerOperation): Update timestamp of the PV and store its new value. Unlike the post() we include the account and peer update in the data stored. """ - pv.post(op.value()) # timestamp and upsert handled by post() above + value = op.value().raw + + self._update_timestamp(value) + self._upsert(value, op.account(), op.peer()) op.done() From 45b3df596a3f7eeacd8c9f58744e4d6956d21ca4 Mon Sep 17 00:00:00 2001 From: Ivan Finch Date: Tue, 26 Aug 2025 13:43:33 +0100 Subject: [PATCH 9/9] Add basic unit tests for new Handler and SharedPV open, close, and post --- src/p4p/test/test_sharedpv.py | 54 ++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/p4p/test/test_sharedpv.py b/src/p4p/test/test_sharedpv.py index a810ba32..dbb05258 100644 --- a/src/p4p/test/test_sharedpv.py +++ b/src/p4p/test/test_sharedpv.py @@ -17,7 +17,7 @@ from ..wrapper import Value, Type from ..client.thread import Context, Disconnected, TimeoutError, RemoteError from ..server import Server, StaticProvider -from ..server.thread import SharedPV, _defaultWorkQueue +from ..server.thread import Handler, SharedPV, _defaultWorkQueue from ..util import WorkQueue from ..nt import NTScalar, NTURI from .utils import RefTestCase @@ -420,3 +420,55 @@ def testPVClose(self): _log.debug('CLOSE') self.assertFalse(self.H.conn) + +class TestHandlerOpenPostClose(RefTestCase): + """ + Test Handler open(), post() and close() functions called correctly by SharedPV. Note that: + - TestRPC, TestFirstLast already test onFirstConnect() and onLastDisconnect(). + - TestGPM, TestPVRequestMask already test put(). + - TestRPC, TestRPC2 already test rpc(). + """ + + class TestHandler(Handler): + def __init__(self): + self.last_op = "init" + + def open(self, value): + self.last_op = "open" + value["value"] = 17 + + def post(self, pv, value): + self.last_op = "post" + value["value"] = value["value"] * 2 + + def close(self, pv): + self.last_op = "close" + + def setUp(self): + super(TestHandlerOpenPostClose, self).setUp() + self.handler = self.TestHandler() + self.pv = SharedPV(handler=self.handler, nt=NTScalar('d')) + + def test_open(self): + # Setup sets the initial value to 5, but the Handler open() overrides + self.pv.open(5) + self.assertEqual(self.handler.last_op, "open") + self.assertEqual(self.pv.current(), 17.0) + + def test_post(self): + self.pv.open(5) + self.pv.post(13.0) + self.assertEqual(self.handler.last_op, "post") + self.assertEqual(self.pv.current(), 26.0) + + def test_close(self): + self.pv.open(5) + self.pv.close(sync=True) + self.assertEqual(self.handler.last_op, "close") + + def tearDown(self): + self.pv.close(sync=True) + self.traceme(self.pv) + del self.pv + + super(TestHandlerOpenPostClose, self).tearDown()