11import json
22from concurrent .futures import ThreadPoolExecutor
33from queue import Empty , Queue
4- from threading import BoundedSemaphore , Lock , Thread
4+ from threading import BoundedSemaphore , Condition , Lock , Thread
55from typing import List , Optional
66
77from fbclient .common_types import FBEvent
@@ -76,7 +76,7 @@ def stop(self):
7676 log .info ('FB Python SDK: event processor is stopping' )
7777 self .__closed = True
7878 self .__flush_task .stop ()
79- self .flush ( )
79+ self .__put_message_async ( MessageType . FLUSH )
8080 self .__put_message_and_wait_terminate (MessageType .SHUTDOWN )
8181
8282
@@ -94,6 +94,7 @@ def __init__(self, config: Config, sender: Sender, inbox: "Queue[EventMessage]")
9494 self .__events_buffer_to_next_flush = []
9595 self .__flush_workers = ThreadPoolExecutor (max_workers = self .__MAX_FLUSH_WORKERS_NUMBER )
9696 self .__permits = BoundedSemaphore (value = self .__MAX_FLUSH_WORKERS_NUMBER )
97+ self .__lock = Condition (Lock ())
9798
9899 # blocks until at least one message is available and then:
99100 # 1: transfer the events to event buffer
@@ -137,6 +138,11 @@ def __put_events_to_buffer(self, event: FBEvent):
137138 self .__events_buffer_to_next_flush .append (event )
138139
139140 def __trigger_flush (self ):
141+ def flush_payload_done (fn ):
142+ self .__permits .release ()
143+ with self .__lock :
144+ self .__lock .notify_all ()
145+
140146 if not self .__closed and len (self .__events_buffer_to_next_flush ) > 0 :
141147 log .debug ('trigger flush' )
142148 # get all the current events from event buffer
@@ -146,21 +152,27 @@ def __trigger_flush(self):
146152 # get an available flush worker to send events
147153 self .__flush_workers \
148154 .submit (FlushPayloadRunner (self .__config , self .__sender , payloads ).run ) \
149- .add_done_callback (lambda x : self . __permits . release () )
155+ .add_done_callback (flush_payload_done )
150156 # clear the buffer for the next flush
151157 self .__events_buffer_to_next_flush .clear ()
152158 # if no available flush worker, keep the events in the buffer
153159
154160 def __shutdown (self ):
155161 if not self .__closed :
156- try :
157- log .debug ('event dispatcher is cleaning up thread and conn pool' )
158- self .__closed = True
159- log .debug ('flush worker pool is stopping...' )
160- self .__flush_workers .shutdown (wait = True )
161- self .__sender .stop ()
162- except Exception as e :
163- log .exception ('FB Python SDK: unexpected error when closing event dispatcher: %s' % str (e ))
162+ with self .__lock :
163+ try :
164+ log .debug ('event dispatcher is cleaning up thread and conn pool' )
165+ self .__wait_until_flush_playload_worker_down ()
166+ self .__closed = True
167+ log .debug ('flush worker pool is stopping...' )
168+ self .__flush_workers .shutdown (wait = True )
169+ self .__sender .stop ()
170+ except Exception as e :
171+ log .exception ('FB Python SDK: unexpected error when closing event dispatcher: %s' % str (e ))
172+
173+ def __wait_until_flush_playload_worker_down (self ):
174+ while self .__permits ._value != self .__MAX_FLUSH_WORKERS_NUMBER :
175+ self .__lock .wait ()
164176
165177
166178class FlushPayloadRunner :
0 commit comments