-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathglobalTimer.c
472 lines (403 loc) · 14.3 KB
/
globalTimer.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
#include "eventTimer.h"
#include "debug.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
/*
* Global Shared Timer support
*
* This timer uses shared memory and interprocess synchronization to
* maintain a single, gloabl shared virtual timer. This is useful when running
* tests / simulations that involve multiple processes that need strict global
* ordering on their clocks. Since this is a more sophisticated timer, it
* is implemented in it's own file and has a bit more documentation involved.
*
* The synchronization is based on a shared memory region accessible via
* a file in the filesystem. The region is laid out as follows:
* 1. Event Loop Mutex - A semaphore used to allow only one process to execute a non-debug event at a time.
* 2. Time Barrier - A semaphore used as a barrier to inform processes when the global time has advanced and given them the opportunity to compete for the event loop mutex, as appropiate.
* 3. Current Time - the current simulated global time
* 4. Number of processes - the number of entries in the process array
* 5. Process Info Array - An array of timer data, one for each process
* a. Next time - The next virtual time the process needs to run
* b. Active - A flag that marks active processes. Used to recover from processes that don't exit the system cleanly
* c. Mutex held - A boolean flag indicating if the process currently holds the event look mutex
* d. process id
*
* A process must hold the event loop mutex prior to executing any global
* time-dependent event loop activities. This ensures only one time step is
* taken at a time. Additionally, the event loop mutex must be held to
* update any of the data in the shared memory segment except to set the
* active flag to 1.
*
* Initializing shared memory.
* Shared memory is initialized by the first process entering the global
* simulation. To avoid race conditions with two processes entering at nearly
* the same time, the posix advisory file locking mechanism (flock) is used.
*
* The per-process portion of the shared memory is initialized prior to
* entering the event loop. This requires obtaining the mutex, adding an entry
* for the new process, synchronizing the new process's virtual clock with the
* global clock, and advancing to the barrier.
*
* Barrier operation
* The barrier is implemented with a semaphore. Prior to releasing the mutex
* the current process updates the global time by iterating through the shared
* memory data structure to find the next smallest time. It also calls up() on
* the semaphore once for each active process, including itself. It marks the
* process with the smallest time as inactive to help cull processes that
* may have crashed. After releasing the mutex it waits at the barrier, which
* should result in being immediately let through the barrier.
*
* Once through the barrier all processes in the system compete for the mutex.
*/
#define MAX_PROCS 128
struct SharedProcessState {
struct timeval next_time;
int active;
int holds_mutex;
int thief;
pid_t pid;
};
struct SharedState {
sem_t evt_mutex;
sem_t barrier;
struct timeval curr_time;
int num_procs;
int time_thief;
struct SharedProcessState procs[MAX_PROCS];
};
struct VirtualGlobalEventTimer {
struct EventTimer et;
struct timeval time;
struct SharedState *state;
struct SharedProcessState *my_state;
int state_fd;
char *state_file;
char paused;
};
char et_gvirt_get_pause(struct EventTimer *et);
void et_gvirt_set_pause(struct EventTimer *et, char pauseState);
int et_gvirt_get_time(struct EventTimer *et, struct timeval *time);
void et_gvirt_set_time(struct EventTimer *et, struct timeval *time);
void et_gvirt_inc_time(struct EventTimer *et, struct timeval *time);
static int setup_shared_state(struct VirtualGlobalEventTimer *self)
{
int res;
struct stat finfo;
int init = 0;
if (!self || !self->state_file)
return -1;
if ((res = open(self->state_file, O_RDWR | O_CREAT, 0666)) <= 0) {
ERRNO_WARN("Failed to open global state file");
return res;
}
self->state_fd = res;
while (-1 == flock(self->state_fd, LOCK_EX)) {
if (errno != EINTR) {
ERRNO_WARN("Failed to flock global state file");
close(self->state_fd);
return -1;
}
}
if (fstat(self->state_fd, &finfo) == -1) {
ERRNO_WARN("Failed to stat global state file");
close(self->state_fd);
return -1;
}
// Initialize the file if too small
if (finfo.st_size < sizeof(struct SharedState)) {
init = 1;
if (ftruncate(self->state_fd, sizeof(struct SharedState)) == -1) {
ERRNO_WARN("Failed to resize global state file");
close(self->state_fd);
return -1;
}
if (fstat(self->state_fd, &finfo) == -1) {
ERRNO_WARN("Failed to re-stat global state file");
close(self->state_fd);
close(self->state_fd);
return -1;
}
}
self->state = (struct SharedState*)mmap(NULL, finfo.st_size,
PROT_READ | PROT_WRITE, MAP_SHARED, self->state_fd, 0);
if (self->state == (void*)-1) {
ERRNO_WARN("Failed to map timer shared memory");
close(self->state_fd);
return -1;
}
if (init) {
if (-1 == sem_init(&self->state->evt_mutex, 1, 1)) {
ERRNO_WARN("Failed to initialize event mutex");
munmap(self->state, sizeof(*self->state));
close(self->state_fd);
return -1;
}
if (-1 == sem_init(&self->state->barrier, 1, 0)) {
ERRNO_WARN("Failed to initialize barrier");
munmap(self->state, sizeof(*self->state));
close(self->state_fd);
return -1;
}
self->state->num_procs = 0;
gettimeofday(&self->state->curr_time, NULL);
}
if (-1 == sem_wait(&self->state->evt_mutex)) {
ERRNO_WARN("Failed to get event mutex");
munmap(self->state, sizeof(*self->state));
close(self->state_fd);
return -1;
}
self->my_state = &self->state->procs[self->state->num_procs++];
self->my_state->next_time = self->state->curr_time;
self->my_state->active = 1;
self->my_state->holds_mutex = 1;
self->my_state->pid = getpid();
flock(self->state_fd, LOCK_UN);
return 0;
}
static int cleanup_shared_state(struct VirtualGlobalEventTimer *self)
{
int i, cnt = 0, del_file = 0;
self->my_state->active = 0;
// release the barrier if we hold the event mutex
if (self->my_state->holds_mutex) {
if (self->my_state->pid == self->state->time_thief &&
self->state->time_thief)
self->state->time_thief = 0;
self->my_state->pid = 0;
del_file = 1;
for (i = 0; i < self->state->num_procs; i++) {
if (self->state->procs[i].pid > 0 && self->state->procs[i].active) {
sem_post(&self->state->barrier);
cnt++;
}
}
self->my_state->holds_mutex = 0;
sem_post(&self->state->evt_mutex);
flock(self->state_fd, LOCK_EX);
}
self->my_state->pid = 0;
if (cnt == 0 && del_file) {
sem_destroy(&self->state->evt_mutex);
sem_destroy(&self->state->barrier);
}
munmap(self->state, sizeof(*self->state));
self->state = NULL;
close(self->state_fd);
self->state_fd = 0;
if (cnt == 0 && del_file) {
flock(self->state_fd, LOCK_UN);
unlink(self->state_file);
}
free(self->state_file);
return 0;
}
int ET_gvirt_block(struct EventTimer *e, struct timeval *nextAwake,
int pauseWhileBlocking, ET_block_cb blockcb, void *arg)
{
struct timeval diffTime, curTime, *blockTime = NULL;
struct VirtualGlobalEventTimer *et = (struct VirtualGlobalEventTimer *)e;
// Set amount of time to block on select.
// When time is "paused" we don't attempt to advance the virtual clock.
// This is the case for internal debugging events.
if (nextAwake && pauseWhileBlocking) {
et->et.get_monotonic_time(&et->et, &curTime);
timersub(nextAwake, &curTime, &diffTime);
if (diffTime.tv_sec < 0 || diffTime.tv_usec < 0) {
memset(&diffTime, 0, sizeof(struct timeval));
}
blockTime = &diffTime;
}
// Block on select indefinetly if virtual clock is paused
else if (nextAwake && (et->paused == VIRT_CLK_ACTIVE ||
et->paused == VIRT_CLK_STOLEN) ) {
assert(et->my_state->holds_mutex);
// Finished with a trip through the global loop.
// Update local time
et->my_state->next_time = *nextAwake;
// Compute next global time
int smallest = -1;
int proc_cnt = 0;
for (int i = 0; i < et->state->num_procs; i++) {
if (et->state->procs[i].pid > 0 && et->state->procs[i].active) {
proc_cnt++;
if (smallest < 0 || timercmp(&et->state->procs[i].next_time,
&et->state->procs[smallest].next_time, <))
smallest = i;
}
}
assert(smallest >= 0);
if (!et->state->time_thief) {
// Update global state with next time
et->state->curr_time = et->state->procs[smallest].next_time;
// Mark next process as inactive to detect crashes
et->state->procs[smallest].active = 0;
// Release the barrier
for (int i = 0; i < proc_cnt; i++)
sem_post(&et->state->barrier);
}
// Release the event mutex for competition
et->my_state->thief = 0;
if (et->state->time_thief && et->state->time_thief == et->my_state->pid)
et->my_state->thief = 1;
et->my_state->holds_mutex = 0;
sem_post(&et->state->evt_mutex);
// If we are stealing time then we skip the barrier and just block
// without updating the global clock's idea of time
if (et->my_state->thief) {
int res;
et->et.get_monotonic_time(&et->et, &curTime);
timersub(nextAwake, &curTime, &diffTime);
if (diffTime.tv_sec < 0 || diffTime.tv_usec < 0) {
memset(&diffTime, 0, sizeof(struct timeval));
}
blockTime = &diffTime;
res = blockcb(&et->et, blockTime, arg);
// re-lock the event mutex prior to returning to the event loop
while (-1 == sem_wait(&et->state->evt_mutex))
;
return res;
}
// Wait at the barrier
while(1) {
int res = sem_wait(&et->state->barrier);
if (res < 0)
continue;
// Through the barrier. Get the event loop mutex
while (-1 == sem_wait(&et->state->evt_mutex))
;
et_gvirt_set_time(&et->et, &et->state->curr_time);
assert(timercmp(nextAwake, &et->my_state->next_time, ==));
// Check to see if it is time to run through our loop
if (timercmp(nextAwake, &et->state->curr_time, >)) {
sem_post(&et->state->evt_mutex);
continue;
}
et->my_state->active = 1;
et->my_state->holds_mutex = 1;
break;
}
// At this point we hold the event loop mutex and can take a turn through
// the loop
memset(&diffTime, 0, sizeof(struct timeval));
blockTime = &diffTime;
}
return blockcb(&et->et, blockTime, arg);
}
void ET_gvirt_cleanup(struct EventTimer *et)
{
struct VirtualGlobalEventTimer *vet = (struct VirtualGlobalEventTimer *)et;
if (vet) {
cleanup_shared_state(vet);
free(vet);
}
}
struct EventTimer *ET_gvirt_init(const char *shared_state, char pause_mode)
{
struct VirtualGlobalEventTimer *et;
if (!shared_state)
return NULL;
et = malloc(sizeof(struct VirtualGlobalEventTimer));
if (!et)
return NULL;
memset(et, 0, sizeof(struct VirtualGlobalEventTimer));
et->state_file = strdup(shared_state);
et->paused = pause_mode;
et->et.block = &ET_gvirt_block;
et->et.get_gmt_time = &et_gvirt_get_time;
et->et.get_monotonic_time = &et_gvirt_get_time;
et->et.cleanup = &ET_gvirt_cleanup;
et->et.virt_inc_time = &et_gvirt_inc_time;
et->et.virt_set_time = &et_gvirt_set_time;
et->et.virt_get_time = &et_gvirt_get_time;
et->et.virt_set_pause = &et_gvirt_set_pause;
et->et.virt_get_pause = &et_gvirt_get_pause;
if (setup_shared_state(et) < 0) {
free(et->state_file);
free(et);
return NULL;
}
return (struct EventTimer *)et;
}
/**
* Increment virtual clock time.
*
* @param clk a Virtual EventTimer object.
* @param time to increment the clock by.
*/
void et_gvirt_inc_time(struct EventTimer *et, struct timeval *time)
{
struct VirtualGlobalEventTimer *clk = (struct VirtualGlobalEventTimer *)et;
assert(et);
timeradd(time, &clk->time, &clk->time);
}
/**
* Set virtual clock time.
*
* @param clk a Virtual EventTimer object.
* @param time to set the clock to.
*/
void et_gvirt_set_time(struct EventTimer *et, struct timeval *time)
{
struct VirtualGlobalEventTimer *clk = (struct VirtualGlobalEventTimer *)et;
assert(et);
clk->time = *time;
}
/**
* Get virtual clock time. Advanced usage only. Most applications
* should use 'EVT_get_gmt_time' instead.
*
* @param clk a Virtual EventTimer object.
*/
int et_gvirt_get_time(struct EventTimer *et, struct timeval *time)
{
struct VirtualGlobalEventTimer *clk = (struct VirtualGlobalEventTimer *)et;
assert(et);
*time = clk->time;
return 0;
}
/**
* Set Virtual EventTimer pause state
*
* @param clk a Virtual EventTimer object.
* @param pauseState ET_virt_PAUSED or CLK_ACTIVE.
*/
void et_gvirt_set_pause(struct EventTimer *et, char pauseState)
{
struct VirtualGlobalEventTimer *clk = (struct VirtualGlobalEventTimer *)et;
assert(et);
if (!clk->my_state->holds_mutex)
return;
if (pauseState == VIRT_CLK_STOLEN) {
if (clk->state->time_thief)
return;
clk->state->time_thief = clk->my_state->pid;
}
else {
if (clk->state->time_thief &&
clk->state->time_thief == clk->my_state->pid)
clk->state->time_thief = 0;
}
clk->paused = pauseState;
}
/**
* Get Virtual EventTimer pause state
*
* @param clk a Virtual EventTimer object.
*
* @return VIRT_CLK_PAUSED or VIRT_CLK_ACTIVE.
*/
char et_gvirt_get_pause(struct EventTimer *et)
{
struct VirtualGlobalEventTimer *clk = (struct VirtualGlobalEventTimer *)et;
assert(et);
return clk->paused;
}