-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.py
76 lines (55 loc) · 2 KB
/
store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from typing import Dict
import firebase_admin
import trio
from firebase_admin import firestore, credentials
from loguru import logger
FIREBASE_CREDENTIALS_FILE = "firebase_creds.json"
cred = credentials.Certificate(FIREBASE_CREDENTIALS_FILE)
firebase_app = firebase_admin.initialize_app(cred)
store = firestore.client()
async def get_document(collection: str, document: str):
# Connect to firebase and get that information somehow here
# Also check if we have the data cached somewhere, then return it from cache
def f():
return store.collection(collection).document(document).get()
doc = await trio.to_thread.run_sync(f)
if doc.exists:
return doc
else:
return False
async def get_device_document(device_id: str):
# Get the document from Firebase somehow
return await get_document("devices", device_id)
def get_device_document_sync(device_id: str):
# This is a synchronous implementation of get_device_document
doc = store.collection("devices").document(device_id).get()
if doc.exists:
logger.debug(f"Fetched {doc} from Firestore.")
return doc.to_dict()
else:
return False
async def get_generated_data(device_id: str, count=5):
def f():
return (
store.collection("devices")
.document(device_id)
.collection("generatedData")
.order_by("creation_timestamp", direction=firestore.Query.DESCENDING)
.limit(count)
.get()
)
data = await trio.to_thread.run_sync(f)
list_of_docs = []
for x in data:
logger.debug(f"Fetched {x.id} document")
list_of_docs.append(x.to_dict())
return list_of_docs
def get_all_rules():
data = store.collection("rules").get()
logger.debug(f"Fetched {len(data)} rules from DB")
return data
async def update_document(collection, document, data):
def f():
doc = store.collection(collection).document(document)
doc.update(data)
await trio.to_thread.run_sync(f)