-
Notifications
You must be signed in to change notification settings - Fork 0
/
threadpool.c
240 lines (199 loc) · 6.42 KB
/
threadpool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/*
* membox Progetto del corso di LSO 2017/2018
*
* Dipartimento di Informatica Università di Pisa
* Docenti: Prencipe, Torquati
*
*/
/**
* @file threadpool.c
* @author Jacopo Massa 543870 \n( <mailto:[email protected]> )
* @brief Implementazione delle funzioni del file threadpool.h
* @copyright **Si dichiara che il contenuto di questo file è in ogni sua parte opera
originale dell'autore**
* @see threadpool.h
*/
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <threadpool.h>
#include <utility.h>
#include <string.h>
/**
* @function threadpool_thread
* @brief funzione eseguita da tutti i thread del pool
*
* @param[in] threadpool threadpool di cui fa parte il thread
*
* @return (void*)NULL.
*/
static void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;)
{
// uso la lock per poter attendere sulla variabile di condizione
LOCK(pool->lock,"pool->lock in threadpool_thread");
/* Mi metto in attesa sulla variabile di condizione,
* controllando che nel frattempo il pool non stia terminando
*/
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
// ricontrollo un'eventuale terminazione dopo essere tornato dalla wait
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
// estraggo un lavoro dalla coda condivisa
task.function = pool->queue[pool->head].function;
task.arg = pool->queue[pool->head].arg;
pool->head = (pool->head + 1) % pool->queue_size;
pool->count -= 1;
UNLOCK(pool->lock,"pool->lock in threadpool_thread");
// eseguo il lavoro estratto dalla coda
(*(task.function))(task.arg);
}
pool->started--;
UNLOCK(pool->lock,"pool->lock in threadpool_thread");
pthread_exit(NULL);
}
threadpool_t *threadpool_create(int thread_count, int queue_size)
{
threadpool_t *pool;
int i;
if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE)
return NULL;
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
goto err;
// inizializzo del pool
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
// alloco memoria per i thread e per la coda condivisa
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *)malloc
(sizeof(threadpool_task_t) * queue_size);
/* Inizializzo la mutex e la variabile di condizione */
if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
(pthread_cond_init(&(pool->notify), NULL) != 0) ||
(pool->threads == NULL) ||
(pool->queue == NULL)) {
goto err;
}
// faccio partire l'esecuzione di ogni thread con la funzione 'threadpool_thread'
for(i = 0; i < thread_count; i++) {
if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0)
{
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
err:
if(pool) //in caso di errore libero la memoria allocata
{
threadpool_free(pool);
}
return NULL;
}
int threadpool_add(threadpool_t *pool, void (*function)(void *), void *arg)
{
int err = 0;
int next;
if(pool == NULL || function == NULL)
return threadpool_invalid;
if(pthread_mutex_lock(&(pool->lock)) != 0)
return threadpool_lock_failure;
next = (pool->tail + 1) % pool->queue_size;
do
{
// controllo che la coda di lavori non sia piena
if(pool->count == pool->queue_size)
{
err = threadpool_queue_full;
break;
}
// controllo che il pool non stia terminando
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
// aggiungo il lavoro alla coda
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].arg = arg;
pool->tail = next;
pool->count += 1;
// segnalo ai thread del pool che c'è qualcosa nella coda
if(pthread_cond_signal(&(pool->notify)) != 0)
{
err = threadpool_lock_failure;
break;
}
} while(0);
if(pthread_mutex_unlock(&pool->lock) != 0)
err = threadpool_lock_failure;
return err;
}
int threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if(pool == NULL)
return threadpool_invalid;
if(pthread_mutex_lock(&(pool->lock)) != 0)
return threadpool_lock_failure;
do
{
// controllo che il pool non stia terminando
if(pool->shutdown)
{
err = threadpool_shutdown;
break;
}
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
// risveglio tutti i thread del pool
if((pthread_cond_broadcast(&(pool->notify)) != 0) ||
(pthread_mutex_unlock(&(pool->lock)) != 0)) {
err = threadpool_lock_failure;
break;
}
// attendo la terminazione di tutti i thread
for(i = 0; i < pool->thread_count; i++) {
if(pthread_join(pool->threads[i], NULL) != 0) {
err = threadpool_thread_failure;
}
}
} while(0);
/* Se tutto è andato bene dealloco il threadpool */
if(!err) {
threadpool_free(pool);
}
return err;
}
int threadpool_free(threadpool_t *pool)
{
if(pool == NULL || pool->started > 0) {
return -1;
}
// controllo che effettivamente sono riuscito ad allocare il pool
if(pool->threads) {
free(pool->threads);
free(pool->queue);
/* Avendo allocato i thread dopo aver inizializzato
* la mutex e la variabile di condizione, sono sicuro che esse sono
* inizializzate.
*
* Effettuo una LOCK sulla mutex solo per sicurezza */
LOCK(pool->lock,"pool->lock in threadpool free");
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}