forked from ecostech/viatom-ble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtmux_co2_display.py
69 lines (57 loc) · 1.91 KB
/
tmux_co2_display.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import os
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
# Configuration
INFLUXDB_URL = os.environ.get("INFLUXDB_URL", "http://localhost:8086")
INFLUXDB_TOKEN_FILE = os.environ.get("INFLUXDB_TOKEN_FILE", "/home/chromebook/.secrets/influx.txt")
INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG", "chromebook")
INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET", "health_data")
# Read the InfluxDB token from the file
try:
with open(INFLUXDB_TOKEN_FILE, 'r') as token_file:
INFLUXDB_TOKEN = token_file.read().strip()
except FileNotFoundError:
print(f"Error: Token file '{INFLUXDB_TOKEN_FILE}' not found")
exit(1)
except Exception as e:
print(f"Error: Unable to read token file: {e}")
exit(1)
# Validate the INFLUXDB_TOKEN
if not INFLUXDB_TOKEN:
print("Error: INFLUXDB_TOKEN is empty or not set")
exit(1)
# Initialize the InfluxDB client
client = influxdb_client.InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
# Query to fetch the last values for all health stats
query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "ambient_sensors" and (r._field == "co2_ppm"))
|> last()
"""
# ASCII art for the stats
ascii_icons = {
"co2_ppm": "*", # Star
}
try:
tables = query_api.query(query, org=INFLUXDB_ORG)
if not tables:
print("No data found.")
exit(1)
stats = {}
for table in tables:
for record in table.records:
field = record.get_field()
value = record.get_value()
stats[field] = value
output = []
for key, icon in ascii_icons.items():
if key in stats:
output.append(f"{icon}{stats[key]}")
else:
output.append(f"{icon} N/A")
print(" ".join(output))
except Exception as e:
print(f"Error querying InfluxDB: {e}")
exit(1)