Skip to content
109 changes: 109 additions & 0 deletions example/auditor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
In this example we setup a simple auditing mechanism that reports information
about the last audited PV changed (which PV, when, and who by) to an auditor
channel. The record of changes is written to the file `audit.log` file.

NOTE: This code is an example only. This design is not suitable for non-demo
use.

The two Handlers below demonstrate many of the methods available to a handler.
The Audited handler records only information about (external) `put()`
operations, not (internal) `post()` operations. The Auditor Handler uses
`open()` to record the start of logging, `post()` to record value changes,
and `close()` to record when auditing finished and write its own value at that
time.

Run this script and then make a change to one of its PVs, e.g.
`python -m p4p.client.cli put demo:pv:audited_d=8.8`. Then check the results of
the change, e.g. `python -m p4p.client.cli get demo:pv:auditor`. You should see
the name of the PV that was changed, the identity of the user that made the
change, and the time the change was made.

After making changes you can inspect the log of changes in the "audit.log".
Use of open() and close() means that it's also possible to check when auditing
started and stopped.
"""

import time

from p4p import Value
from p4p.nt.scalar import NTScalar
from p4p.server import Server
from p4p.server.raw import Handler
from p4p.server.thread import SharedPV


class Auditor(Handler):
"""Persist information to file so we can audit when the program is closed"""

def open(self, value: Value):
"""Record the time the auditing PV was opened."""
with open("audit.log", mode="a+", encoding="utf8") as f:
f.write(f"Auditing opened at {time.ctime()}\n")

def post(self, pv, value: Value):
"""Record the time a change was to the auditing PV, and the change made."""
with open("audit.log", mode="a+", encoding="utf8") as f:
f.write(f"Auditing updated at {time.ctime()}; {value['value']}\n")

def close(self, pv: Value):
"""Record the time the auditing PV was closed."""
with open("audit.log", mode="a+", encoding="utf8") as f:
value = pv.current().raw["value"]
if value:
f.write(f"Auditing closed at {time.ctime()}; {value}\n")
else:
f.write(f"Auditing closed at {time.ctime()}; no changes made\n")


class Audited(Handler):
"""Forward information about Put operations to the auditing PV."""

def __init__(self, pv: SharedPV):
self._audit_pv = pv

def put(self, pv, op):
"""Each time a Put operation is made we forward some information to the auditing PV."""
pv.post(op.value())
self._audit_pv.post(
f"Channel {op.name()} last updated by {op.account()} at {time.ctime()}"
)
op.done()


def main():
"""
Create a set of text PVs. The audited channels forward information about the user that
made a change and their new values to an auditor channel.
"""

# Setup some PVs that will be audited and one (`demo:pv:unaudted_i`) that won't be
# Note that the audited handler does have a put so it can also be changed externally
auditor_pv = SharedPV(nt=NTScalar("s"), handler=Auditor(), initial="")

pvs = {
"demo:pv:auditor": auditor_pv,
"demo:pv:audited_d": SharedPV(
nt=NTScalar("d"), handler=Audited(auditor_pv), initial=9.99
),
"demo:pv:audited_i": SharedPV(
nt=NTScalar("i"), handler=Audited(auditor_pv), initial=4
),
"demo:pv:audited_s": SharedPV(
nt=NTScalar("s"), handler=Audited(auditor_pv), initial="Testing"
),
"demo:pv:unaudted_i": SharedPV(nt=NTScalar("i"), initial=-1),
}

print(pvs.keys())
try:
Server.forever(providers=[pvs])
except KeyboardInterrupt:
pass
finally:
# We need to close the auditor PV manually, the server stop() won't do it for us
auditor_pv.close()


if __name__ == "__main__":
main()
223 changes: 223 additions & 0 deletions example/persist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
"""
Use a handler to automatically persist values to an SQLite3 file database.
Any values persisted this way will be automatically restored when the
program is rerun. The details of users (account name and IP address) are
recorded for puts.

NOTE: This is intended as an example only. It has not been tested for
performance, reliability, or robustness.

Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the
program while continuing to monitor the PV. Compare with the value of
`demo:pv:uptime` which resets on each program start. Try setting the value of
demo:pv:optime while continuing to monitor it. It is recommended to
inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`.

There is an important caveat for this simple demo:
The `PersistHandler` will not work as expected if anything other than the
value of a field is changed, e.g. if a Control field was added to an NTScalar
if would not be persisted correctly. This could be resolved by correctly
merging the pv.current().raw and value.raw appropriately in the post().
"""

import json
import sqlite3
import time

from p4p import Value
from p4p.nt.scalar import NTScalar
from p4p.server import Server, ServerOperation
from p4p.server.raw import Handler
from p4p.server.thread import SharedPV


class PersistHandler(Handler):
"""
A handler that will allow simple persistence of values and timestamps using
SQLite3 across retarts.

It requires the open handler in order to restore values from a previous run.
It requires a post handler in order to persist values set within the program.
"""

def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True):
self._conn = conn
self._pv_name = pv_name
self._open_restore = open_restore

def open(self, value: Value):
"""
Restore values from the previous run, if possible.
If there isn't a previous value then we persist this initial value.
"""

# If there is a value already in the database we always use that
# instead of the supplied initial value, unless the
# _open_restore flag indicates otherwise.
if not self._open_restore:
return

# We could, in theory, re-apply authentication here if we queried for
# that information and then did something with it!
res = self._conn.execute("SELECT * FROM pvs WHERE id=?", [self._pv_name])
query_val = res.fetchone()

if query_val is not None:
json_val = json.loads(query_val[1])

# Report on restored values and setter (reports None if not known)
print(
f"Will restore to {self._pv_name} value: {json_val['value']}, set by {query_val[2]}"
)

# Override initial value
value["value"] = json_val["value"]

# Override current timestamp (if any) with time the new value was set
value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][
"secondsPastEpoch"
]
value["timeStamp.nanoseconds"] = json_val["timeStamp"]["nanoseconds"]
else:
# We are using an initial value so persist it
self._upsert(value)

def post(self, pv: SharedPV, value: Value):
"""Update timestamp of the PV and store its new value."""
self._update_timestamp(value)

self._upsert(value)

def put(self, pv: SharedPV, op: ServerOperation):
"""
Update timestamp of the PV and store its new value. Unlike the post()
we include the account and peer update in the data stored.
"""
value = op.value().raw

self._update_timestamp(value)
self._upsert(value, op.account(), op.peer())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to do this? Originally I was using pv.post(op.value()) which is much neater, but discards the information about op.account() and op.peer(). Using op.value().raw feels brittle, and will discard any data apart from the value.


op.done()

def _update_timestamp(self, value: Value) -> None:
"""Update the timestamp of the PV to the current time."""

if not value.changed("timeStamp") or (
value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0
):
now = time.time()
value["timeStamp.secondsPastEpoch"] = now // 1
value["timeStamp.nanoseconds"] = int((now % 1) * 1e9)

def _upsert(
self, value: Value, account: str | None = None, peer: str | None = None
) -> None:
# Persist the data; turn into JSON and write it to the DB
val_json = json.dumps(value.todict())

# Use UPSERT: https://sqlite.org/lang_upsert.html
self._conn.execute(
"""
INSERT INTO pvs (id, data, account, peer)
VALUES (:name, :json_data, :account, :peer)
ON CONFLICT(id)
DO UPDATE SET data = :json_data, account = :account, peer = :peer;
""",
{
"name": self._pv_name,
"json_data": val_json,
"account": account,
"peer": peer,
},
)
self._conn.commit()


def main() -> None:
"""
Create SQLite3 database and a set of PVs to demonstrate persistance
using a p4p Handler.
"""

# Create an SQLite dayabase to function as our persistence store
conn = sqlite3.connect("persist_pvs.db", check_same_thread=False)
# conn.execute("DROP TABLE IF EXISTS pvs")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pvs
(id VARCHAR(255),
data JSON,
account VARCHAR(30),
peer VARCHAR(55),
PRIMARY KEY (id)
)
;
"""
) # IPv6 addresses can be long and will contain port number as well!

# Create the example PVs.
# Note that we switch off restores for `demo:pv:uptime`. If we simply
# didn't use the PersistHandler the timestamp of the PV would not be
# updated.
duplicate_pv = SharedPV(
nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12
)
pvs = {
"demo:pv:optime": SharedPV(
nt=NTScalar("i"),
handler=PersistHandler("demo:pv:optime", conn),
initial=0,
), # Operational time; total time running
"demo:pv:uptime": SharedPV(
nt=NTScalar("i"),
handler=PersistHandler("demo:pv:uptime", conn, open_restore=False),
timestamp=time.time(),
initial=0,
), # Uptime since most recent (re)start
"demo:pv:int": duplicate_pv,
"demo:pv:float": SharedPV(
nt=NTScalar("d"),
handler=PersistHandler("demo:pv:float", conn),
initial=9.99,
),
"demo:pv:string": SharedPV(
nt=NTScalar("s"),
handler=PersistHandler("demo:pv:string", conn),
initial="Hello!",
),
"demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore
}

# Make the uptime PV readonly; maybe we want to be able to update optime
# after major system upgrades?
uptime_pv = pvs["demo:pv:uptime"]

@uptime_pv.put
def read_only(_pv: SharedPV, op: ServerOperation):
"""Don't allow put operations on this PV."""

op.done(error="Read-only")

print(f"Starting server with the following PVs: {pvs}")

server = None
try:
server = Server(providers=[pvs])
while True:
# Every second increment the values of uptime and optime
time.sleep(1)
increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1
pvs["demo:pv:uptime"].post(increment_value)
increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1
pvs["demo:pv:optime"].post(increment_value)
except KeyboardInterrupt:
pass
finally:
if server:
server.stop()
conn.close()


if __name__ == "__main__":
main()
Loading