Skip to content

Commit 5b46995

Browse files
committed
5.4.0 refactor codes to improve performance and fix bug.
1 parent 5d3a993 commit 5b46995

File tree

14 files changed

+198
-117
lines changed

14 files changed

+198
-117
lines changed

code/default/lib/noarch/front_base/boringssl_wrap.py

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(self, context, sock, ip_str=None, sni=None, on_close=None):
3535

3636
def wrap(self):
3737
ip, port = utils.get_ip_port(self.ip_str)
38+
self.ip = ip
3839
if isinstance(ip, str):
3940
ip = utils.to_bytes(ip)
4041

code/default/lib/noarch/front_base/config.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def set_default(self):
2121
self.set_var("dispather_score_factor", 1)
2222
self.set_var("dispather_max_idle_workers", 30)
2323
self.set_var("dispather_worker_max_continue_fail", 8)
24+
self.set_var("dispather_connect_all_workers_on_startup", 0)
2425

2526
self.set_var("max_task_num", 100)
2627

@@ -32,7 +33,7 @@ def set_default(self):
3233

3334
# http 2 worker
3435
self.set_var("http2_max_concurrent", 60)
35-
self.set_var("http2_target_concurrent", 60)
36+
self.set_var("http2_target_concurrent", 6)
3637
self.set_var("http2_max_timeout_tasks", 5)
3738
self.set_var("http2_max_process_tasks", 900) # Nginx will GoAway after 1000 tasks.
3839
self.set_var("http2_timeout_active", 15)
@@ -47,7 +48,7 @@ def set_default(self):
4748
# connect manager
4849
self.set_var("https_max_connect_thread", 1)
4950
self.set_var("max_connect_thread", 1)
50-
self.set_var("connect_create_interval", 1)
51+
self.set_var("connect_create_interval", 0.1)
5152
self.set_var("ssl_first_use_timeout", 10)
5253
self.set_var("connection_pool_min", 1)
5354
self.set_var("https_keep_alive", 15) # time to pass created link to worker

code/default/lib/noarch/front_base/connect_manager.py

+31-38
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,11 @@ def __init__(self, logger, config, connect_creator, ip_manager, check_local_netw
156156

157157
self.thread_num_lock = threading.Lock()
158158
self.timeout = 4
159-
self.max_timeout = 60
160159
self.thread_num = 0
161160
self.running = True
162-
self.connect_counter = 0
163161

164-
self.get_num_lock = threading.Lock()
165-
self.https_get_num = 0
162+
self._waiting_num_lock = threading.Lock()
163+
self._connection_waiting_num = 0
166164
self.no_ip_lock = threading.Lock()
167165

168166
# after new created ssl_sock timeout(50 seconds)
@@ -185,8 +183,6 @@ def __init__(self, logger, config, connect_creator, ip_manager, check_local_netw
185183
else:
186184
self.keep_conn_th = None
187185

188-
self.create_more_connection()
189-
190186
def stop(self):
191187
self.running = False
192188

@@ -219,71 +215,68 @@ def keep_connection_daemon(self):
219215
time.sleep(5)
220216
continue
221217

222-
self.connect_process()
218+
self._connect_process()
223219

224220
def _need_more_ip(self):
225-
if self.https_get_num:
221+
if self._connection_waiting_num:
226222
return True
227223
else:
228224
return False
229225

230-
def create_more_connection(self):
226+
def _create_more_connection(self):
231227
if not self.connecting_more_thread:
232228
with self.thread_num_lock:
233-
self.connecting_more_thread = threading.Thread(target=self.create_more_connection_worker)
229+
self.connecting_more_thread = threading.Thread(target=self._create_more_connection_worker)
234230
self.connecting_more_thread.start()
235231

236-
def create_more_connection_worker(self):
237-
while self.thread_num < self.config.https_max_connect_thread and \
238-
self._need_more_ip():
232+
def _create_more_connection_worker(self):
233+
while self.thread_num < self.config.https_max_connect_thread and self._need_more_ip():
239234

240235
self.thread_num_lock.acquire()
241236
self.thread_num += 1
242237
self.thread_num_lock.release()
243-
p = threading.Thread(target=self.connect_thread)
238+
p = threading.Thread(target=self._connect_thread)
244239
p.start()
245-
time.sleep(0.5)
240+
time.sleep(self.config.connect_create_interval)
246241

247242
with self.thread_num_lock:
248243
self.connecting_more_thread = None
249244

250-
def connect_thread(self, sleep_time=0):
245+
def _connect_thread(self, sleep_time=0):
251246
time.sleep(sleep_time)
252247
try:
253248
while self.running and self._need_more_ip():
254249
if self.new_conn_pool.qsize() > self.config.https_connection_pool_max:
255250
break
256251

257-
self.connect_process()
252+
self._connect_process()
258253
finally:
259254
self.thread_num_lock.acquire()
260255
self.thread_num -= 1
261256
self.thread_num_lock.release()
262257

263-
def connect_process(self):
258+
def _connect_process(self):
264259
try:
265260
ip_str, sni, host = self.ip_manager.get_ip_sni_host()
266261
if not ip_str:
267262
with self.no_ip_lock:
268263
# self.logger.warning("not enough ip")
269264
time.sleep(10)
270-
return
265+
return None
271266

272267
# self.logger.debug("create ssl conn %s", ip_str)
273268
ssl_sock = self._create_ssl_connection(ip_str, sni, host)
274269
if not ssl_sock:
275270
time.sleep(1)
276-
return
271+
return None
277272

278273
self.new_conn_pool.put((ssl_sock.handshake_time, ssl_sock))
279-
self.connect_counter += 1
280274

281275
if self.config.connect_create_interval > 0:
282-
if self.connect_counter >= 2:
283-
sleep = random.randint(self.config.connect_create_interval, self.config.connect_create_interval*2)
284-
time.sleep(sleep)
285-
else:
286-
time.sleep(1)
276+
sleep = random.uniform(self.config.connect_create_interval, self.config.connect_create_interval*2)
277+
time.sleep(sleep)
278+
279+
return ssl_sock
287280
except Exception as e:
288281
self.logger.exception("connect_process except:%r", e)
289282

@@ -303,8 +296,8 @@ def _create_ssl_connection(self, ip_str, sni, host):
303296
self.logger.debug("connect %s network fail, %r", ip_str, e)
304297
time.sleep(1)
305298
else:
306-
self.logger.debug("connect %s network fail:%r", ip_str, e)
307-
self.ip_manager.report_connect_fail(ip_str, sni, str(e))
299+
self.logger.debug("connect %s fail:%r", ip_str, e)
300+
self.ip_manager.report_connect_fail(ip_str, sni, str(e))
308301
except NoRescourceException as e:
309302
self.logger.warning("create ssl for %s except:%r", ip_str, e)
310303
self.ip_manager.report_connect_fail(ip_str, sni, str(e))
@@ -318,12 +311,11 @@ def _create_ssl_connection(self, ip_str, sni, host):
318311
self.logger.exception("connect %s fail:%r", ip_str, e)
319312
time.sleep(1)
320313

321-
def get_ssl_connection(self):
322-
with self.get_num_lock:
323-
self.https_get_num += 1
314+
def get_ssl_connection(self, timeout=30):
315+
with self._waiting_num_lock:
316+
self._connection_waiting_num += 1
324317

325-
start_time = time.time()
326-
self.create_more_connection()
318+
end_time = time.time() + timeout
327319
try:
328320
while self.running:
329321
ret = self.new_conn_pool.get(block=True, timeout=1)
@@ -336,11 +328,12 @@ def get_ssl_connection(self):
336328
# self.logger.debug("new_conn_pool.get:%s handshake:%d timeout.", ssl_sock.ip, handshake_time)
337329
self.ip_manager.report_connect_closed(ssl_sock.ip_str, ssl_sock.sni, "get_timeout")
338330
ssl_sock.close()
339-
continue
340331
else:
341-
if time.time() - start_time > self.max_timeout:
342-
self.logger.debug("create ssl timeout fail.")
332+
if time.time() > end_time:
333+
self.logger.debug("get_ssl_connection timeout")
343334
return None
335+
336+
self._create_more_connection()
344337
finally:
345-
with self.get_num_lock:
346-
self.https_get_num -= 1
338+
with self._waiting_num_lock:
339+
self._connection_waiting_num -= 1

code/default/lib/noarch/front_base/http1.py

-2
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,6 @@ def close(self, reason=""):
272272
# Notify loop to exit
273273
# This function may be call by out side http2
274274
# When gae_proxy found the appid or ip is wrong
275-
self.accept_task = False
276-
self.keep_running = False
277275
self.task_queue.put(None)
278276

279277
if self.task is not None:

code/default/lib/noarch/front_base/http2_connection.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
from six.moves import queue
24
import threading
35
import socket
@@ -220,8 +222,6 @@ def get_rtt_rate(self):
220222
return self.rtt + len(self.streams) * 3000
221223

222224
def close(self, reason="conn close"):
223-
self.keep_running = False
224-
self.accept_task = False
225225
# Notify loop to exit
226226
# This function may be call by out side http2
227227
# When gae_proxy found the appid or ip is wrong
@@ -286,9 +286,9 @@ def increase_remote_window_size(self, inc_size):
286286
self.idle_cb()
287287

288288
def _send_cb(self, frame):
289-
# can called by stream
289+
# can be called by stream
290290
# put to send_blocked if connection window not allow,
291-
if frame.type == DataFrame.type:
291+
if frame.type in [HeadersFrame.type, DataFrame.type]:
292292
if len(frame.data) > self.remote_window_size:
293293
self.blocked_send_frames.append(frame)
294294
self.accept_task = False
@@ -512,9 +512,11 @@ def _update_settings(self, frame):
512512
stream.max_frame_size += new_size
513513

514514
def get_trace(self):
515+
now = time.time()
515516
out_list = []
516517
out_list.append(" continue_timeout:%d" % self.continue_timeout)
517518
out_list.append(" processed:%d" % self.processed_tasks)
519+
out_list.append(" inactive:%d, %d" % (now - self.last_send_time, now - self.last_recv_time))
518520
out_list.append(" h2.stream_num:%d" % len(self.streams))
519521
out_list.append(" sni:%s, host:%s" % (self.ssl_sock.sni, self.ssl_sock.host))
520522
return ",".join(out_list)

code/default/lib/noarch/front_base/http2_stream.py

+6
Original file line numberDiff line numberDiff line change
@@ -341,12 +341,18 @@ def send_response(self):
341341

342342
def close(self, reason="close"):
343343
if not self.task.responsed:
344+
# self.task.set_state("stream close: %s, call retry" % reason)
344345
self.connection.retry_task_cb(self.task, reason)
345346
else:
347+
# self.task.set_state("stream close: %s, finished" % reason)
346348
self.task.finish()
347349
# empty block means fail or closed.
350+
348351
self._close_remote()
352+
# self.task.set_state("stream close: %s, closed remote" % reason)
353+
349354
self._close_cb(self.stream_id, reason)
355+
# self.task.set_state("stream close: %s, called close_cb" % reason)
350356

351357
@property
352358
def _local_closed(self):

code/default/lib/noarch/front_base/http_common.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
184184
self.ip_manager = ip_manager
185185
self.config = config
186186
self.ssl_sock = ssl_sock
187-
self.init_rtt = ssl_sock.handshake_time / 3
188-
self.rtt = self.init_rtt
187+
self.rtt = ssl_sock.handshake_time
189188
self.speed = 200
190189
self.ip_str = ssl_sock.ip_str
191190
self.close_cb = close_cb
@@ -218,14 +217,19 @@ def __str__(self):
218217

219218
def update_debug_data(self, rtt, sent, received, speed):
220219
self.rtt = rtt
221-
self.speed_history.append(speed)
222-
if len(self.speed_history) > 10:
223-
self.speed_history.pop(0)
224-
self.speed = sum(self.speed_history) / len(self.speed_history)
220+
if sent + received > 10000:
221+
self.speed_history.append(speed)
222+
if len(self.speed_history) > 10:
223+
self.speed_history.pop(0)
224+
self.speed = sum(self.speed_history) / len(self.speed_history)
225225

226226
self.log_debug_data(rtt, sent, received)
227227

228228
def close(self, reason):
229+
if not self.keep_running:
230+
self.logger.warn("worker already closed %s", self.ip_str)
231+
return
232+
229233
self.accept_task = False
230234
self.keep_running = False
231235
self.ssl_sock.close()
@@ -238,9 +242,14 @@ def close(self, reason):
238242
self.close_cb(self)
239243

240244
def get_score(self):
241-
score = (50 - (self.speed/6.0)) + (self.rtt/30.0)
245+
# The smaller, the better
246+
score = (50 - (self.speed/6.0)) + (self.rtt/20.0)
242247
if self.version != "1.1":
243-
score += len(self.streams) * 5
248+
score += len(self.streams) * 3
249+
250+
if self.config.show_state_debug:
251+
self.logger.debug("get_score %s, speed:%d rtt:%d stream_num:%d score:%d", self.ip_str,
252+
self.speed, self.rtt, len(self.streams), score)
244253

245254
return score
246255

0 commit comments

Comments
 (0)