-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_loop.py
247 lines (206 loc) · 8.05 KB
/
event_loop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import contextlib
import dataclasses
import datetime
import faulthandler
import logging
import threading
from typing import Optional
import events
import util.general
import util.print_parse
import util.ssh
import util.subsuming_queue
import util.threading
import webhook_listener
@dataclasses.dataclass
class WebhookConfig:
"""Configuration for webhooks."""
netloc_listen: util.print_parse.NetLoc
"""The local net location to listen at for webhook notifications."""
netloc_specify: util.print_parse.NetLoc
"""The net location to specify in the webhook configuration."""
secret_token: str
"""
Secret token to use for webhook authentication.
The value does not matter, but it should not be guessable.
"""
@dataclasses.dataclass
class CanvasSyncConfig:
"""Configuration for synchronizing teachers, students, and groups from Canvas to GitLab."""
labs_to_sync: tuple
"""Tuple of lab ids to synchronize."""
sync_interval: Optional[datetime.timedelta]
"""
How often to synchronize.
If not set, don't synchronize except potentially at the start.
"""
start_with_sync: bool
"""
Whether to start with a synchronization.
If false, the first synchronization occurs after sync_interval.
"""
logger = logging.getLogger(__name__)
def run(
courses,
run_time: Optional[datetime.timedelta] = None,
webhook_config: Optional[WebhookConfig] = None,
canvas_sync_config: Optional[CanvasSyncConfig] = None,
):
"""
Run the event loop.
This method only returns after an event of
kind TerminateProgram has been processed.
The event loop starts with processing of all labs.
So it is unnecessary to prefix it with a call to initial_run.
Arguments:
* courses:
Collection of instances of course.Course.
The courses to run the event loop for.
This function takes care of calling setup.
* run_time:
If set, the event loop will exit after this period has elapsed.
This is the only way for this function to return.
* webhook_config:
Configuration for webhook notifications from GitLab Chalmers.
Set to None to disable the webhook mechanism in the event loop.
* canvas_sync_config:
Configuration for the mechanism synchronizing graders, students, and groups from Canvas to GitLab.
Set to None to disable.
"""
# Resource management.
exit_stack = contextlib.ExitStack()
with exit_stack:
# The event queue.
event_queue = util.subsuming_queue.SubsumingQueue()
def shutdown():
event_queue.add((events.TerminateProgram(), None))
# Context managers for threads we create.
thread_managers = []
# Configure SSH multiplexers.
# Ideally, all courses connect to the same SSH server.
def f():
for ssh_netloc in set(c.config.gitlab_ssh.netloc for c in courses):
multiplexer = util.ssh.Multiplexer(ssh_netloc)
exit_stack.enter_context(contextlib.closing(multiplexer))
yield (ssh_netloc, multiplexer)
ssh_multiplexers = dict(f())
for c in courses:
c.ssh_multiplexer = ssh_multiplexers[c.config.gitlab_ssh.netloc]
# Set up courses.
for c in courses:
c.setup()
# Configure webhooks.
if webhook_config is not None:
for c in courses:
c.hooks_ensure(netloc=webhook_config.netloc_specify)
courses_by_groups_path = {c.config.path_course: c for c in courses}
def add_webhook_event(hook_event):
for result in webhook_listener.parse_hook_event(
courses_by_groups_path=courses_by_groups_path,
hook_event=hook_event,
strict=False,
):
event_queue.add(result)
webhook_listener_manager = webhook_listener.server_manager(
webhook_config.netloc_listen,
webhook_config.secret_token,
add_webhook_event,
)
webhook_server = exit_stack.enter_context(webhook_listener_manager)
def webhook_server_run():
try:
webhook_server.serve_forever()
finally:
shutdown()
def webhook_server_shutdown():
webhook_server.shutdown()
webhook_server_thread = threading.Thread(
target=webhook_server_run,
name="webhook-server-listener",
)
thread_managers.append(
util.general.add_cleanup(
util.threading.thread_manager(webhook_server_thread),
webhook_server_shutdown,
)
)
# Set up program termination timer.
if run_time is not None:
shutdown_timer = util.threading.Timer(
run_time,
shutdown,
name="shutdown-timer",
)
thread_managers.append(util.threading.timer_manager(shutdown_timer))
# Set up Canvas sync event timers and add potential initial sync.
def sync_from_canvas(course):
event_queue.add(
(
course.program_event(events.SyncFromCanvas()),
lambda: course.sync_teachers_and_lab_projects(
canvas_sync_config.labs_to_sync
),
)
)
if canvas_sync_config is not None:
for course in courses:
if canvas_sync_config.start_with_sync:
sync_from_canvas(course)
if canvas_sync_config.sync_interval is not None:
course.sync_timer = util.threading.Timer(
canvas_sync_config.sync_interval,
sync_from_canvas,
args=[course],
name=(
"course-sync-from-canvas-timer"
f"<{course.config.path_course}>"
),
repeat=True,
)
thread_managers.append(
util.threading.timer_manager(course.sync_timer)
)
# Set up lab refresh event timers and add initial lab refreshes.
def refresh_lab(lab):
event_queue.add(
(
lab.course.program_event(lab.course_event(events.RefreshLab())),
lab.refresh_lab,
)
)
delay = datetime.timedelta()
for c in courses:
for lab in c.labs.values():
refresh_lab(lab)
if lab.config.refresh_period is not None:
lab.refresh_timer = util.threading.Timer(
lab.config.refresh_period + delay,
refresh_lab,
args=[lab],
name=f"lab-refresh-timer<{c.config.path_course}, {lab.name}>",
repeat=True,
)
thread_managers.append(
util.threading.timer_manager(lab.refresh_timer)
)
delay += c.config.webhook.first_lab_refresh_delay
# Start the threads.
for manager in thread_managers:
exit_stack.enter_context(manager)
@contextlib.contextmanager
def print_stacks():
try:
yield
finally:
faulthandler.dump_traceback()
# Print stacks before cleanup to help debug stalling.
exit_stack.enter_context(print_stacks())
# The event loop.
while True:
logger.info("Waiting for event.")
(event, callback) = event_queue.remove()
if isinstance(event, events.TerminateProgram):
logger.info("Program termination event received, shutting down.")
return
logger.info(f"Handling event {event}")
callback()