-
Notifications
You must be signed in to change notification settings - Fork 0
/
obs-sessionize-updater.py
276 lines (228 loc) · 12.5 KB
/
obs-sessionize-updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import obspython as obs
import requests
from datetime import datetime
import json
import pytz
import hashlib
import os
# Properties
last_fetch_time = None
schedule_data = None
local_timezone = datetime.now(pytz.timezone('America/Phoenix')).strftime('%Z')
def update_source_dropdowns(props):
# Get a list of all sources
sources = obs.obs_enum_sources()
list_of_sources = [(obs.obs_source_get_name(source), obs.obs_source_get_name(source)) for source in sources]
obs.source_list_release(sources)
# Add dropdown lists for the source names
for source_name in ["current_title_source", "current_presenters_source", "next_title_source", "next_presenters_source"]:
prop = obs.obs_properties_get(props, source_name)
obs.obs_property_list_clear(prop)
for item in list_of_sources:
obs.obs_property_list_add_string(prop, *item)
def script_description():
return "Updates text sources with current and next session information."
def script_update(settings):
global url, current_title_source_name, current_presenters_source_name, next_title_source_name, next_presenters_source_name, fake_current_datetime, local_timezone, room_name, fetch_interval_minutes, enabled
url = obs.obs_data_get_string(settings, "url")
enabled = obs.obs_data_get_bool(settings, "enabled")
current_title_source_name = obs.obs_data_get_string(settings, "current_title_source")
current_presenters_source_name = obs.obs_data_get_string(settings, "current_presenters_source")
next_title_source_name = obs.obs_data_get_string(settings, "next_title_source")
next_presenters_source_name = obs.obs_data_get_string(settings, "next_presenters_source")
fake_current_datetime = obs.obs_data_get_string(settings, "fake_current_datetime")
local_timezone = obs.obs_data_get_string(settings, "local_timezone")
room_name = obs.obs_data_get_string(settings, "room_name")
fetch_interval_minutes = obs.obs_data_get_int(settings, "fetch_interval_minutes")
# Save settings to a file
with open('settings.json', 'w') as f:
json.dump({
"url": url,
"enabled": enabled,
"current_title_source": current_title_source_name,
"current_presenters_source": current_presenters_source_name,
"next_title_source": next_title_source_name,
"next_presenters_source": next_presenters_source_name,
"fake_current_datetime": fake_current_datetime,
"local_timezone": local_timezone,
"room_name": room_name,
"fetch_interval_minutes": fetch_interval_minutes
}, f)
def script_defaults(settings):
# Load settings from a file
try:
with open('settings.json', 'r') as f:
saved_settings = json.load(f)
except FileNotFoundError:
saved_settings = {}
obs.obs_data_set_default_bool(settings, "enabled", saved_settings.get("enabled", False))
obs.obs_data_set_default_string(settings, "url", saved_settings.get("url", ""))
obs.obs_data_set_default_string(settings, "current_title_source", saved_settings.get("current_title_source", ""))
obs.obs_data_set_default_string(settings, "current_presenters_source", saved_settings.get("current_presenters_source", ""))
obs.obs_data_set_default_string(settings, "next_title_source", saved_settings.get("next_title_source", ""))
obs.obs_data_set_default_string(settings, "next_presenters_source", saved_settings.get("next_presenters_source", ""))
obs.obs_data_set_default_string(settings, "fake_current_datetime", saved_settings.get("fake_current_datetime", ""))
obs.obs_data_set_default_string(settings, "local_timezone", saved_settings.get("local_timezone", local_timezone))
obs.obs_data_set_default_string(settings, "room_name", saved_settings.get("room_name", ""))
obs.obs_data_set_default_int(settings, "fetch_interval_minutes", saved_settings.get("fetch_interval_minutes", 5))
def script_properties():
props = obs.obs_properties_create()
obs.obs_properties_add_bool(props, "enabled", "Enabled")
# Get a list of all sources
sources = obs.obs_enum_sources()
list_of_sources = [(obs.obs_source_get_name(source), obs.obs_source_get_name(source)) for source in sources]
obs.source_list_release(sources)
# Add dropdown lists for the source names
for source_name in ["current_title_source", "current_presenters_source", "next_title_source", "next_presenters_source"]:
prop = obs.obs_properties_add_list(props, source_name, f"{source_name.replace('_', ' ').title()} Field", obs.OBS_COMBO_TYPE_LIST, obs.OBS_COMBO_FORMAT_STRING)
for item in list_of_sources:
obs.obs_property_list_add_string(prop, *item)
obs.obs_properties_add_int(props, "fetch_interval_minutes", "Fetch Interval (minutes)", 1, 60, 1)
# Get a list of all timezones
list_of_timezones = [(tz, tz) for tz in pytz.all_timezones]
# Add dropdown list for the local timezone
prop = obs.obs_properties_add_list(props, "local_timezone", "Local Timezone", obs.OBS_COMBO_TYPE_LIST, obs.OBS_COMBO_FORMAT_STRING)
for item in list_of_timezones:
obs.obs_property_list_add_string(prop, *item)
obs.obs_properties_add_text(props, "url", "URL", obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_text(props, "fake_current_datetime", "Fake Current LOCAL DateTime (format: YYYY-MM-DDTHH:MM:SS)", obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_text(props, "room_name", "Room Name", obs.OBS_TEXT_DEFAULT)
return props
def fetch_data_from_api(api_url):
# Send a GET request to the API
response = requests.get(api_url)
# Parse the JSON response
data = response.json()
# Compute a hash of the data for comparison with the old data
data_hash = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()
old_data_hash = None
try:
# Try to open the file containing the hash of the old data
with open('data_hash.txt', 'r') as f:
# Read the old data hash
old_data_hash = f.read().strip()
except FileNotFoundError:
# If the file does not exist, do nothing
obs.script_log(obs.LOG_INFO, "data_hash.txt not found, creating new one.")
pass
# If the old data hash does not match the new data hash
if old_data_hash != data_hash:
obs.script_log(obs.LOG_INFO, "Data hash has changed, updating the data.")
# Open the file to store the new data
with open('schedule_data.json', 'w') as f:
# Write the new data to the file
json.dump(data, f)
# Open the file to store the new data hash
with open('data_hash.txt', 'w') as f:
# Write the new data hash to the file
f.write(data_hash)
def get_data_from_local():
# Open and read the schedule data from the JSON file
with open('schedule_data.json', 'r') as f:
data = json.load(f)
# Set the current time in the local timezone
local_tz = pytz.timezone(local_timezone)
current_time = datetime.now(local_tz)
# If a fake current datetime is provided, use that instead
if fake_current_datetime:
current_time = datetime.strptime(fake_current_datetime, "%Y-%m-%dT%H:%M:%S")
current_time = local_tz.localize(current_time)
# Convert the current time to UTC
current_time = current_time.astimezone(pytz.utc).isoformat()
# Initialize variables for the current and next session titles and presenters
current_title = None
current_presenters = None
next_title = None
next_presenters = None
# Loop through each day in the schedule data
for day in data:
# Loop through each room, filtering for the specific room name
for room in [r for r in day['rooms'] if r['name'] == room_name]:
# Loop through each session in the room
for session in room['sessions']:
# If the current time falls within the session's start and end times, set the current session title and presenters
if session['startsAt'] <= current_time < session['endsAt']:
current_title = session['title']
current_presenters = ', '.join(speaker['name'] for speaker in session['speakers'])
obs.script_log(obs.LOG_INFO, f"Current session: {current_title}, Presenters: {current_presenters}")
# If the current time is before a session's start time and the next session title has not been set yet, set the next session title and presenters
elif session['startsAt'] > current_time and next_title is None:
next_title = session['title']
next_presenters = ', '.join(speaker['name'] for speaker in session['speakers'])
obs.script_log(obs.LOG_INFO, f"Next session: {next_title}, Presenters: {next_presenters}")
# Return the current and next session titles and presenters
return current_title, current_presenters, next_title, next_presenters
def set_text(source_name, text):
# Get the source by its name
source = obs.obs_get_source_by_name(source_name)
# If the source exists
if source is not None:
# Create a new OBS data instance
settings = obs.obs_data_create()
# Set the 'text' field of the OBS data instance to the provided text
obs.obs_data_set_string(settings, "text", text)
# Update the source with the new settings
obs.obs_source_update(source, settings)
# Log the update
obs.script_log(obs.LOG_INFO, f"Updated source {source_name} with text: {text}")
# Release the OBS data instance to free up memory
obs.obs_data_release(settings)
# Release the source to free up memory
obs.obs_source_release(source)
else:
# If the source does not exist, log a warning
obs.script_log(obs.LOG_WARNING, f"Source {source_name} does not exist.")
# Function to update the last fetch time
def update_last_fetch_time():
global last_fetch_time
# Get the current time in UTC and format it as an ISO 8601 string
last_fetch_time = datetime.utcnow().isoformat()
# Open the file 'last_fetch_time.txt' in write mode
with open('last_fetch_time.txt', 'w') as f:
# Write the last fetch time to the file
f.write(last_fetch_time)
obs.script_log(obs.LOG_INFO, f"Updated last fetch time: {last_fetch_time}")
def get_last_fetch_time():
try:
# Open the file 'last_fetch_time.txt' in read mode
with open('last_fetch_time.txt', 'r') as f:
# Read the last fetch time from the file
last_fetch_time_str = f.read()
# Convert the last fetch time from an ISO 8601 string to a datetime object and return it
return datetime.fromisoformat(last_fetch_time_str)
except FileNotFoundError:
# If the file 'last_fetch_time.txt' does not exist, return None
obs.script_log(obs.LOG_WARNING, "File 'last_fetch_time.txt' not found.")
return None
tick_counter = 0
def script_tick(seconds):
global last_fetch_time, tick_counter
# Increment the tick counter
tick_counter += 1
# If 10 seconds have not passed, just return
if tick_counter < 10:
return
# Reset the tick counter
tick_counter = 0
if not enabled:
return
# Fetch data every fetch_interval_minutes
if last_fetch_time is None or (datetime.utcnow() - last_fetch_time).total_seconds() >= fetch_interval_minutes * 60:
try:
# Fetch data from the API
fetch_data_from_api(url)
# Update last_fetch_time in the file
update_last_fetch_time()
# Get the updated last_fetch_time from the file
last_fetch_time = get_last_fetch_time()
obs.script_log(obs.LOG_INFO, "Data fetched from API.")
except requests.exceptions.RequestException:
# If fetching data from the API fails, log a warning and use the last fetched data
obs.script_log(obs.LOG_WARNING, "Failed to fetch data from API. Using the last fetched data.")
# Get the current and next session titles and presenters from the local data
current_title, current_presenters, next_title, next_presenters = get_data_from_local()
# Set the text of the current and next title and presenter sources
set_text(current_title_source_name, current_title)
set_text(current_presenters_source_name, current_presenters)
set_text(next_title_source_name, next_title)
set_text(next_presenters_source_name, next_presenters)