33#include  <dirent.h> 
44#include  <fcntl.h> 
55#include  <lzma.h> 
6+ #include  <pthread.h> 
7+ #include  <sys/socket.h> 
68#include  <sys/stat.h> 
79#include  <unistd.h> 
810
11+ #include  "affinity-count.h" 
912#include  "castore.h" 
1013#include  "def.h" 
1114#include  "dirent-util.h" 
1922/* #undef EBADMSG */ 
2023/* #define EBADMSG __LINE__ */ 
2124
25+ #define  WORKER_THREADS_MAX  64U
26+ 
2227struct  CaStore  {
2328        char  * root ;
2429        bool  is_cache :1 ;
@@ -34,6 +39,10 @@ struct CaStore {
3439
3540        uint64_t  n_requests ;
3641        uint64_t  n_request_bytes ;
42+ 
43+         pthread_t  worker_threads [WORKER_THREADS_MAX ];
44+         size_t  n_worker_threads , n_worker_threads_max ;
45+         int  worker_thread_socket [2 ];
3746};
3847
3948struct  CaStoreIterator  {
@@ -47,28 +56,36 @@ struct CaStoreIterator {
4756CaStore *  ca_store_new (void ) {
4857        CaStore  * store ;
4958
50-         store  =  new0 (CaStore , 1 );
59+         store  =  new (CaStore , 1 );
5160        if  (!store )
5261                return  NULL ;
5362
54-         store -> digest_type  =  _CA_DIGEST_TYPE_INVALID ;
55- 
56-         store -> compression  =  CA_CHUNK_COMPRESSED ;
57-         store -> compression_type  =  CA_COMPRESSION_DEFAULT ;
63+         * store  =  (CaStore ) {
64+                 .digest_type  =  _CA_DIGEST_TYPE_INVALID ,
65+                 .compression  =  CA_CHUNK_COMPRESSED ,
66+                 .compression_type  =  CA_COMPRESSION_DEFAULT ,
67+                 .worker_thread_socket  =  { -1 , -1 },
68+                 .n_worker_threads_max  =  (size_t ) -1 ,
69+         };
5870
5971        return  store ;
6072}
6173
6274CaStore  * ca_store_new_cache (void ) {
6375        CaStore  * s ;
6476
65-         s  =  new0 (CaStore , 1 );
77+         s  =  new (CaStore , 1 );
6678        if  (!s )
6779                return  NULL ;
6880
69-         s -> is_cache  =  true;
70-         s -> compression  =  CA_CHUNK_AS_IS ;
71-         s -> compression_type  =  CA_COMPRESSION_DEFAULT ;
81+         * s  =  (CaStore ) {
82+                 .is_cache  =  true,
83+                 .compression  =  CA_CHUNK_AS_IS ,
84+                 .compression_type  =  CA_COMPRESSION_DEFAULT ,
85+ 
86+                 .worker_thread_socket  =  { -1 , -1  },
87+                 .n_worker_threads_max  =  (size_t ) -1 ,
88+         };
7289
7390        return  s ;
7491}
@@ -77,6 +94,8 @@ CaStore* ca_store_unref(CaStore *store) {
7794        if  (!store )
7895                return  NULL ;
7996
97+         (void ) ca_store_finalize (store );
98+ 
8099        if  (store -> is_cache  &&  store -> root )
81100                (void ) rm_rf (store -> root , REMOVE_ROOT |REMOVE_PHYSICAL );
82101
@@ -240,6 +259,203 @@ int ca_store_has(CaStore *store, const CaChunkID *chunk_id) {
240259        return  ca_chunk_file_test (AT_FDCWD , store -> root , chunk_id );
241260}
242261
262+ struct  queue_entry  {
263+         CaChunkID  chunk_id ;
264+         CaChunkCompression  effective_compression ;
265+         void  * data ;
266+         size_t  size ;
267+ };
268+ 
269+ static  void *  worker_thread (void  * p ) {
270+         CaStore  * store  =  p ;
271+         int  ret  =  0 , r ;
272+ 
273+         assert (store );
274+         assert (store -> worker_thread_socket [1 ] >= 0 );
275+ 
276+         (void ) pthread_setname_np (pthread_self (), "worker-thread" );
277+ 
278+         for  (;;) {
279+                 struct  queue_entry  e ;
280+                 ssize_t  n ;
281+ 
282+                 n  =  recv (store -> worker_thread_socket [0 ], & e , sizeof (e ), 0 );
283+                 if  (n  <  0 ) {
284+                         if  (errno  ==  EINTR )
285+                                 continue ;
286+ 
287+                         log_debug_errno (errno , "Failed to read from thread pool socket: %m" );
288+                         return  INT_TO_PTR (errno );
289+                 }
290+                 if  (n  ==  0 ) /* Either EOF or zero-sized datagram (Linux doesn't really allow us to 
291+                              * distinguish that), we take both as an indication to exit the worker thread. */ 
292+                         break ;
293+ 
294+                 assert (n  ==  sizeof (e ));
295+ 
296+                 r  =  ca_chunk_file_save (
297+                                 AT_FDCWD , store -> root ,
298+                                 & e .chunk_id ,
299+                                 e .effective_compression , store -> compression ,
300+                                 store -> compression_type ,
301+                                 e .data , e .size );
302+                 free (e .data );
303+ 
304+                 if  (r  <  0 ) {
305+                         log_debug_errno (r , "Failed to store chunk in store: %m" );
306+ 
307+                         if  (r  !=  - EEXIST )
308+                                 ret  =  r ;
309+                 }
310+         }
311+ 
312+         return  INT_TO_PTR (ret );
313+ }
314+ 
315+ static  int  determine_worker_threads_max (CaStore  * store ) {
316+         const  char  * e ;
317+         int  r ;
318+ 
319+         assert (store );
320+ 
321+         if  (store -> n_worker_threads_max  !=  (size_t ) -1 )
322+                 return  0 ;
323+ 
324+         e  =  getenv ("CASYNC_WORKER_THREADS" );
325+         if  (e ) {
326+                 unsigned  u ;
327+ 
328+                 r  =  safe_atou (e , & u );
329+                 if  (r  <  0 )
330+                         log_debug_errno (r , "Failed to parse $CASYNC_WORKER_THREADS, ignoring: %s" , e );
331+                 else  if  (u  >  WORKER_THREADS_MAX ) {
332+                         log_debug ("$CASYNC_WORKER_THREADS out of range, clamping to %zu: %s" , (size_t ) WORKER_THREADS_MAX , e );
333+                         store -> n_worker_threads_max  =  WORKER_THREADS_MAX ;
334+                 } else  {
335+                         store -> n_worker_threads_max  =  u ;
336+                         return  0 ;
337+                 }
338+         }
339+ 
340+         r  =  cpus_in_affinity_mask ();
341+         if  (r  <  0 )
342+                 return  log_debug_errno (r , "Failed to determine CPUs in affinity mask: %m" );
343+ 
344+         store -> n_worker_threads_max  =  MIN ((size_t ) r , WORKER_THREADS_MAX );
345+         return  0 ;
346+ }
347+ 
348+ static  int  start_worker_thread (CaStore  * store ) {
349+         int  r ;
350+ 
351+         assert (store );
352+ 
353+         r  =  determine_worker_threads_max (store );
354+         if  (r  <  0 )
355+                 return  r ;
356+ 
357+         if  (store -> n_worker_threads  >= (size_t ) store -> n_worker_threads_max )
358+                 return  0 ;
359+ 
360+         if  (store -> worker_thread_socket [0 ] <  0 )
361+                 if  (socketpair (AF_UNIX , SOCK_SEQPACKET |SOCK_CLOEXEC , 0 , store -> worker_thread_socket ) <  0 )
362+                         return  - errno ;
363+ 
364+         r  =  pthread_create (store -> worker_threads  +  store -> n_worker_threads , NULL , worker_thread , store );
365+         if  (r  !=  0 )
366+                 return  - r ;
367+ 
368+         store -> n_worker_threads ++ ;
369+ 
370+         log_debug ("Started store worker thread %zu." , store -> n_worker_threads );
371+         return  0 ;
372+ }
373+ 
374+ static  int  submit_to_worker_thread (
375+                 CaStore  * store ,
376+                 const  CaChunkID  * chunkid ,
377+                 CaChunkCompression  effective_compression ,
378+                 const  void  * p ,
379+                 uint64_t  l ) {
380+ 
381+         struct  queue_entry  e ;
382+         void  * copy  =  NULL ;
383+         ssize_t  n ;
384+         int  r ;
385+ 
386+         assert (store );
387+ 
388+         /* If there's no need to compress/decompress, then let's do things client side, since the operation 
389+          * is likely IO bound, not CPU bound */ 
390+         if  (store -> compression  ==  CA_CHUNK_AS_IS  || 
391+             store -> compression  ==  effective_compression )
392+                 return  - ENOANO ;
393+ 
394+         /* Before we submit the chunk for compression, let's see if it exists already. If so, let's return 
395+          * -EEXIST right away, so that the caller can count reused chunks. Note that this is a bit racy 
396+          * currently, as submitted but not yet processed chunks are not considered. */ 
397+         r  =  ca_store_has (store , chunkid );
398+         if  (r  <  0 )
399+                 return  r ;
400+         if  (r  >  0 )
401+                 return  - EEXIST ;
402+ 
403+         /* Let's start a new worker thread each time we have a new job to process, until we reached all 
404+          * worker threads we need */ 
405+         (void ) start_worker_thread (store );
406+ 
407+         /* If there are no worker threads, do things client side */ 
408+         if  (store -> n_worker_threads  <= 0  || 
409+             store -> worker_thread_socket [1 ] <  0 )
410+                 return  - ENETDOWN ;
411+ 
412+         copy  =  memdup (p , l );
413+         if  (!copy )
414+                 return  - ENOMEM ;
415+ 
416+         e  =  (struct  queue_entry ) {
417+                 .chunk_id  =  * chunkid ,
418+                 .effective_compression  =  effective_compression ,
419+                 .data  =  copy ,
420+                 .size  =  l ,
421+         };
422+ 
423+         n  =  send (store -> worker_thread_socket [1 ], & e , sizeof (e ), 0 );
424+         if  (n  <  0 ) {
425+                 free (copy );
426+                 return  - errno ;
427+         }
428+ 
429+         assert (n  ==  sizeof (e ));
430+         return  0 ;
431+ }
432+ 
433+ int  ca_store_finalize (CaStore  * store ) {
434+         int  ret  =  0 , r ;
435+         size_t  i ;
436+ 
437+         assert (store );
438+ 
439+         /* Trigger EOF in all worker threads */ 
440+         store -> worker_thread_socket [1 ] =  safe_close (store -> worker_thread_socket [1 ]);
441+ 
442+         for  (i  =  0 ; i  <  store -> n_worker_threads ; i ++ ) {
443+                 void  * p ;
444+                 r  =  pthread_join (store -> worker_threads [i ], & p );
445+                 if  (r  !=  0 )
446+                         ret  =  - r ;
447+                 if  (p  !=  NULL )
448+                         ret  =  - PTR_TO_INT (p );
449+         }
450+ 
451+         store -> n_worker_threads  =  0 ;
452+         store -> worker_thread_socket [0 ] =  safe_close (store -> worker_thread_socket [0 ]);
453+ 
454+         /* Propagate errors we ran into while processing store requests. This is useful for callers to 
455+          * determine whether the worker threads ran into any problems. */ 
456+         return  ret ;
457+ }
458+ 
243459int  ca_store_put (
244460                CaStore  * store ,
245461                const  CaChunkID  * chunk_id ,
@@ -273,6 +489,14 @@ int ca_store_put(
273489                store -> mkdir_done  =  true;
274490        }
275491
492+         r  =  submit_to_worker_thread (
493+                         store ,
494+                         chunk_id ,
495+                         effective_compression ,
496+                         data , size );
497+         if  (r  >= 0 )
498+                 return  0 ;
499+ 
276500        return  ca_chunk_file_save (
277501                        AT_FDCWD , store -> root ,
278502                        chunk_id ,
0 commit comments