1616#include < stdexcept>
1717#include < semaphore.h>
1818#include " ../../../../SDK/components/utilities/include/sample_log.h"
19+ #include " thread_safe_list.h"
1920using namespace StackFlows ;
2021#ifdef ENABLE_BACKWARD
2122#define BACKWARD_HAS_DW 1
@@ -58,11 +59,9 @@ class llm_task {
5859 task_callback_t out_callback_;
5960 bool enoutput_;
6061 bool enstream_;
61- sem_t inference_semaphore;
62- std::atomic_int inference_status_;
62+
6363 std::unique_ptr<std::thread> inference_run_;
64- std::atomic_bool is_running_;
65- std::string _inference_msg;
64+ thread_safe::list<std::string> async_list_;
6665
6766 void set_output (task_callback_t out_callback)
6867 {
@@ -223,24 +222,26 @@ class llm_task {
223222
224223 void run ()
225224 {
226- sem_wait (&inference_semaphore) ;
227- while (is_running_ ) {
225+ std::string par ;
226+ for (;; ) {
228227 {
229- inference (_inference_msg );
230- inference_status_-- ;
231- sem_wait (&inference_semaphore );
228+ par = async_list_. get ( );
229+ if (par. empty ()) break ;
230+ inference (par );
232231 }
233232 }
234233 }
235234
236235 int inference_async (const std::string &msg)
237236 {
238- if (inference_status_ == INFERENCE_NONE) {
239- _inference_msg = msg;
240- inference_status_ = INFERENCE_RUNNING;
241- sem_post (&inference_semaphore);
237+ if (msg.empty ()) return -1 ;
238+ if (async_list_.size () < 3 ) {
239+ std::string par = msg;
240+ async_list_.put (par);
241+ } else {
242+ SLOGE (" inference list is full\n " );
242243 }
243- return inference_status_ ;
244+ return async_list_. size () ;
244245 }
245246
246247 void inference (const std::string &msg)
@@ -286,7 +287,8 @@ class llm_task {
286287
287288 bool pause ()
288289 {
289- lLaMa_->Stop ();
290+ if (lLaMa_)
291+ lLaMa_->Stop ();
290292 return true ;
291293 }
292294
@@ -314,25 +316,38 @@ class llm_task {
314316
315317 llm_task (const std::string &workid) : tokenizer_server_flage_(false ), port_(getNextPort())
316318 {
317- inference_status_ = INFERENCE_NONE;
318- sem_init (&inference_semaphore, 0 , 0 );
319- is_running_ = true ;
320319 inference_run_ = std::make_unique<std::thread>(std::bind (&llm_task::run, this ));
321320 }
322321
322+ void start ()
323+ {
324+ if (!inference_run_) {
325+ inference_run_ = std::make_unique<std::thread>(std::bind (&llm_task::run, this ));
326+ }
327+ }
328+
329+ void stop ()
330+ {
331+ if (inference_run_) {
332+ std::string par;
333+ async_list_.put (par);
334+ if (lLaMa_)
335+ lLaMa_->Stop ();
336+ inference_run_->join ();
337+ inference_run_.reset ();
338+ }
339+ }
340+
323341 ~llm_task ()
324342 {
325- is_running_ = false ;
326- sem_post (&inference_semaphore);
327- if (inference_run_) inference_run_->join ();
343+ stop ();
328344 if (tokenizer_pid_ != -1 ) {
329345 kill (tokenizer_pid_, SIGTERM);
330346 waitpid (tokenizer_pid_, nullptr , WNOHANG);
331347 }
332348 if (lLaMa_) {
333349 lLaMa_->Deinit ();
334350 }
335- sem_destroy (&inference_semaphore);
336351 }
337352};
338353
@@ -647,10 +662,9 @@ class llm_llm : public StackFlow {
647662 send (" None" , " None" , error_body, work_id);
648663 return -1 ;
649664 }
650- task_pause ( llm_task_[work_id_num], get_channel (work_id_num) );
665+ llm_task_[work_id_num]-> stop ( );
651666 auto llm_channel = get_channel (work_id_num);
652667 llm_channel->stop_subscriber (" " );
653- llm_task_[work_id_num]->lLaMa_ ->Stop ();
654668 llm_task_.erase (work_id_num);
655669 send (" None" , " None" , LLM_NO_ERROR, work_id);
656670 return 0 ;
@@ -663,6 +677,7 @@ class llm_llm : public StackFlow {
663677 if (iteam == llm_task_.end ()) {
664678 break ;
665679 }
680+ iteam->second ->stop ();
666681 get_channel (iteam->first )->stop_subscriber (" " );
667682 iteam->second .reset ();
668683 llm_task_.erase (iteam->first );
0 commit comments