@@ -17,23 +17,23 @@ void* worker_s(void *);
1717void * worker_c (void * );
1818
1919struct timeval tv1 , tv2 ;
20- lfqueue_t myq ;
21-
22- #define total_put 500
23- int nthreads = 4 ; //sysconf(_SC_NPROCESSORS_ONLN); // Linux
20+ #define total_put 50000
21+ int nthreads = 16 ; //sysconf(_SC_NPROCESSORS_ONLN); // Linux
2422int one_thread = 1 ;
2523int nthreads_exited = 0 ;
24+ lfqueue_t * myq ;
2625/** Worker Keep Consuming at the same time, do not try instensively **/
2726void * worker_c (void * arg ) {
2827 int i = 0 ;
2928 int * int_data ;
3029 int total_loop = total_put * (* (int * )arg );
3130 while (i ++ < total_loop ) {
3231 /*Dequeue*/
33- while ((int_data = lfqueue_deq (& myq )) == NULL ) {
34- // usleep(1);
32+ while ((int_data = lfqueue_deq (myq )) == NULL ) {
33+
3534 }
3635 // printf("%d\n", *int_data);
36+
3737 free (int_data );
3838 }
3939 __sync_add_and_fetch (& nthreads_exited , 1 );
@@ -51,11 +51,11 @@ void* worker_s(void *arg)
5151 * int_data = i ;
5252 /*Enqueue*/
5353
54- while (lfqueue_enq (& myq , int_data )) {
55- //printf("ENQ FULL?\n");
54+ while (lfqueue_enq (myq , int_data )) {
55+ // printf("ENQ FULL?\n");
5656 }
5757 }
58- __sync_add_and_fetch (& nthreads_exited , 1 );
58+ // __sync_add_and_fetch(&nthreads_exited, 1);
5959 return 0 ;
6060}
6161
@@ -69,13 +69,13 @@ void* worker_sc(void *arg)
6969 assert (int_data != NULL );
7070 * int_data = i ++ ;
7171 /*Enqueue*/
72- while (lfqueue_enq (& myq , int_data )) {
72+ while (lfqueue_enq (myq , int_data )) {
7373 printf ("ENQ FULL?\n" );
7474 }
7575
7676 /*Dequeue*/
77- while ((int_data = lfqueue_deq (& myq )) == NULL ) {
78- // printf("DEQ EMPTY? %zu\n", lfqueue_size(& myq));
77+ while ((int_data = lfqueue_deq (myq )) == NULL ) {
78+ // printf("DEQ EMPTY? %zu\n", lfqueue_size(myq));
7979 }
8080 // printf("%d\n", *int_data);
8181 free (int_data );
@@ -87,19 +87,20 @@ void* worker_sc(void *arg)
8787#define join_threads \
8888for (i = 0; i < nthreads; i++)\
8989pthread_join(threads[i], NULL);\
90- printf("current size= %d\n", (int) lfqueue_size(& myq) )
90+ printf("current size= %d\n", (int) lfqueue_size(myq) )
9191
9292#define detach_thread_and_loop \
9393for (i = 0; i < nthreads; i++)\
9494pthread_detach(threads[i]);\
95- while (nthreads_exited<= nthreads) \
96- ; \
97- if(lfqueue_size(& myq) != 0){\
98- usleep (2000);\
99- printf("current size= %zu\n", lfqueue_size(& myq) );\
95+ while ( nthreads_exited < nthreads ) \
96+ lfqueue_usleep(2000); \
97+ if(lfqueue_size(myq) != 0){\
98+ lfqueue_usleep (2000);\
99+ printf("current size= %zu\n", lfqueue_size(myq) );\
100100}
101101
102102void multi_enq_deq (pthread_t * threads ) {
103+ printf ("-----------%s---------------\n" , "multi_enq_deq" );
103104 int i ;
104105 for (i = 0 ; i < nthreads ; i ++ ) {
105106 pthread_create (threads + i , NULL , worker_sc , NULL );
@@ -109,6 +110,7 @@ void multi_enq_deq(pthread_t *threads) {
109110 // detach_thread_and_loop;
110111}
111112void one_deq_and_multi_enq (pthread_t * threads ) {
113+ printf ("-----------%s---------------\n" , "one_deq_and_multi_enq" );
112114 int i ;
113115 for (i = 0 ; i < nthreads ; i ++ )
114116 pthread_create (threads + i , NULL , worker_s , & one_thread );
@@ -120,27 +122,26 @@ void one_deq_and_multi_enq(pthread_t *threads) {
120122}
121123
122124void one_enq_and_multi_deq (pthread_t * threads ) {
125+ printf ("-----------%s---------------\n" , "one_enq_and_multi_deq" );
123126 int i ;
124127 for (i = 0 ; i < nthreads ; i ++ )
125128 pthread_create (threads + i , NULL , worker_c , & one_thread );
126129
127130 worker_s (& nthreads );
128131
129- //join_threads;
130132#pragma GCC diagnostic push
131133#pragma GCC diagnostic ignored "-Wimplicit-function-declaration"
132134 detach_thread_and_loop ;
133135#pragma GCC diagnostic pop
134136
135137}
136-
138+ int ri = 10 ;
137139int main (void )
138140{
139- const int total_run = 10 ;
140- int n ;
141- for (n = 0 ; n < total_run ; n ++ ) {
142- printf ("running count = %d\n" , n );
143- if (lfqueue_init (& myq , total_put , nthreads , 1 ) == -1 )
141+
142+ myq = malloc (sizeof (lfqueue_t ));
143+ nthreads_exited = 0 ;
144+ if (lfqueue_init (myq , 1024 ) == -1 )
144145 return -1 ;
145146
146147 /* Spawn threads. */
@@ -149,21 +150,32 @@ int main(void)
149150 printf ("Total requests %d \n" , total_put );
150151 gettimeofday (& tv1 , NULL );
151152
152- // one_enq_and_multi_deq(threads);
153+ one_enq_and_multi_deq (threads );
154+
155+ //one_deq_and_multi_enq(threads);
153156
154- // one_deq_and_multi_enq(threads);
157+ // multi_enq_deq(threads);
158+
159+
160+ // worker_s(&ri);
161+
162+
163+ // worker_c(&ri);
155164
156- multi_enq_deq (threads );
157165
158166 gettimeofday (& tv2 , NULL );
159167 printf ("Total time = %f seconds\n" ,
160168 (double ) (tv2 .tv_usec - tv1 .tv_usec ) / 1000000 +
161169 (double ) (tv2 .tv_sec - tv1 .tv_sec ));
162170
163171 //getchar();
164- assert ( 0 == lfqueue_size (& myq ) && "Error, all queue should be consumed but not" );
172+ lfqueue_usleep (1000 );
173+ assert ( 0 == lfqueue_size (myq ) && "Error, all queue should be consumed but not" );
174+
175+ lfqueue_destroy (myq );
176+ // sleep(3);
177+ free (myq );
178+
165179
166- lfqueue_destroy (& myq );
167- }
168180 return 0 ;
169181}
0 commit comments