1
1
#!/usr/bin/env python
2
2
# coding:utf-8
3
3
4
- import sys
5
4
import os
6
5
import errno
7
6
import binascii
8
7
import time
9
- import thread
10
8
import socket
11
9
import select
12
10
import Queue
@@ -155,8 +153,8 @@ def __init__(self):
155
153
self .max_retry = 3
156
154
self .timeout = 3
157
155
self .max_timeout = 5
158
- self .max_thread_num = 40
159
- self .connection_pool_num = 30
156
+ self .max_thread_num = 10
157
+ self .connection_pool_num = 20
160
158
161
159
self .conn_pool = Connect_pool () #Queue.PriorityQueue()
162
160
@@ -173,10 +171,15 @@ def save_ssl_connection_for_reuse(self, ssl_sock):
173
171
174
172
while self .conn_pool .qsize () > self .connection_pool_num :
175
173
t , ssl_sock = self .conn_pool .get_slowest ()
176
- if t < 300 :
174
+
175
+ if t < 500 :
177
176
#self.conn_pool.put( (ssl_sock.handshake_time, ssl_sock) )
177
+ ssl_sock .close ()
178
178
return
179
- ssl_sock .close ()
179
+ else :
180
+ ssl_sock .close ()
181
+
182
+
180
183
181
184
def create_ssl_connection (self ):
182
185
@@ -268,6 +271,7 @@ def connect_thread():
268
271
if ssl_sock :
269
272
ssl_sock .last_use_time = time .time ()
270
273
self .conn_pool .put ((ssl_sock .handshake_time , ssl_sock ))
274
+ time .sleep (1 )
271
275
finally :
272
276
self .thread_num_lock .acquire ()
273
277
self .thread_num -= 1
@@ -282,6 +286,7 @@ def create_more_connection():
282
286
p = threading .Thread (target = connect_thread )
283
287
p .daemon = True
284
288
p .start ()
289
+ time .sleep (0.5 )
285
290
286
291
287
292
while True :
@@ -320,14 +325,19 @@ def create_more_connection():
320
325
321
326
322
327
class Forward_connection_manager ():
323
- timeout = 3
324
- max_timeout = 5
328
+ timeout = 1
329
+ max_timeout = 10
325
330
tcp_connection_cache = Queue .PriorityQueue ()
331
+ thread_num_lock = threading .Lock ()
332
+ thread_num = 0
333
+ max_thread_num = 10
334
+
335
+ def create_connection (self , port = 443 , sock_life = 5 ):
336
+ if port != 443 :
337
+ logging .warn ("forward port %d not supported." , port )
338
+ return None
326
339
327
- def create_connection (self , sock_life = 5 ):
328
- def _create_connection (ip_port , queobj , delay = 0 ):
329
- if delay != 0 :
330
- time .sleep (delay )
340
+ def _create_connection (ip_port ):
331
341
ip = ip_port [0 ]
332
342
sock = None
333
343
# start connection time record
@@ -358,54 +368,52 @@ def _create_connection(ip_port, queobj, delay=0):
358
368
logging .debug ("tcp conn %s time:%d" , ip , conn_time * 1000 )
359
369
360
370
# put ssl socket object to output queobj
361
- queobj . put (sock )
371
+ self . tcp_connection_cache . put (( time . time (), sock ) )
362
372
except Exception as e :
363
- # any socket.error, put Excpetions to output queobj.
364
- queobj .put (e )
365
373
conn_time = int ((time .time () - start_time ) * 1000 )
366
374
logging .debug ("tcp conn %s fail t:%d" , ip , conn_time )
367
375
google_ip .report_connect_fail (ip )
368
376
#logging.info("create_tcp report fail ip:%s", ip)
369
377
if sock :
370
378
sock .close ()
379
+ finally :
380
+ self .thread_num_lock .acquire ()
381
+ self .thread_num -= 1
382
+ self .thread_num_lock .release ()
371
383
372
- def recycle_connection (count , queobj ):
373
- for i in range (count ):
374
- sock = queobj .get ()
375
- if sock and not isinstance (sock , Exception ):
376
- self .tcp_connection_cache .put ((time .time (), sock ))
377
-
378
- try :
379
- ctime , sock = self .tcp_connection_cache .get_nowait ()
380
- if time .time () - ctime < sock_life :
381
- return sock
382
- except Queue .Empty :
383
- pass
384
384
385
+ while True :
386
+ try :
387
+ ctime , sock = self .tcp_connection_cache .get_nowait ()
388
+ if time .time () - ctime < sock_life :
389
+ return sock
390
+ else :
391
+ sock .close ()
392
+ continue
393
+ except Queue .Empty :
394
+ break
385
395
386
- port = 443
387
396
start_time = time .time ()
388
- #while time.time() - start_time < self.max_timeout:
389
- for j in range (3 ):
390
- addresses = []
391
- for i in range (3 ):
397
+ while time .time () - start_time < self .max_timeout :
398
+
399
+ if self .thread_num < self .max_thread_num :
392
400
ip = google_ip .get_gws_ip ()
393
401
if not ip :
394
- logging .warning ("no gws ip." )
402
+ logging .error ("no gws ip." )
395
403
return
396
- addresses . append (( ip , port ) )
397
-
398
- addrs = addresses
399
- queobj = Queue . Queue ()
400
- delay = 0
401
- for addr in addrs :
402
- thread . start_new_thread ( _create_connection , ( addr , queobj , delay ) )
403
- #delay += 0.05
404
- for i in range ( len ( addrs )) :
405
- result = queobj . get ()
406
- if not isinstance ( result , ( socket . error , OSError , IOError )):
407
- thread . start_new_thread ( recycle_connection , ( len ( addrs ) - i - 1 , queobj ))
408
- return result
404
+ addr = ( ip , port )
405
+ self . thread_num_lock . acquire ()
406
+ self . thread_num += 1
407
+ self . thread_num_lock . release ()
408
+ p = threading . Thread ( target = _create_connection , args = ( addr ,))
409
+ p . daemon = True
410
+ p . start ( )
411
+
412
+ try :
413
+ ctime , sock = self . tcp_connection_cache . get (timeout = 0.4 )
414
+ return sock
415
+ except :
416
+ continue
409
417
logging .warning ('create tcp connection fail.' )
410
418
411
419
@@ -425,6 +433,10 @@ def forward_socket(self, local, remote, timeout=60, tick=2, bufsize=8192):
425
433
for sock in ins :
426
434
data = sock .recv (bufsize )
427
435
if not data :
436
+ if sock is remote :
437
+ logging .debug ("forward remote disconnected." )
438
+ else :
439
+ logging .debug ("forward local disconnected." )
428
440
return
429
441
430
442
if sock is remote :
@@ -433,15 +445,15 @@ def forward_socket(self, local, remote, timeout=60, tick=2, bufsize=8192):
433
445
else :
434
446
remote .sendall (data )
435
447
timecount = timeout
436
- except NetWorkIOError as e :
448
+ except Exception as e :
437
449
if e .args [0 ] not in (errno .ECONNABORTED , errno .ECONNRESET , errno .ENOTCONN , errno .EPIPE ):
438
- raise
450
+ logging . exception ( "forward except:%s." , e )
439
451
finally :
440
452
if local :
441
453
local .close ()
442
454
if remote :
443
455
remote .close ()
444
- logging . debug ( "forward closed." )
456
+
445
457
446
458
447
459
0 commit comments