Skip to content

Commit

Permalink
Merge pull request #1145 from Wilson-G/master
Browse files Browse the repository at this point in the history
添加新的select best ip的方法为默认选取ip的方法
  • Loading branch information
yutiansut authored Apr 30, 2019
2 parents 3d4d078 + a45f8c7 commit 663af33
Showing 1 changed file with 92 additions and 42 deletions.
134 changes: 92 additions & 42 deletions QUANTAXIS/QAFetch/QATdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from QUANTAXIS.QAUtil import Parallelism
from QUANTAXIS.QAUtil.QACache import QA_util_cache


def init_fetcher():
"""初始化获取
"""
Expand Down Expand Up @@ -100,8 +101,16 @@ def ping(ip, port=7709, type_='stock'):
return datetime.timedelta(9, 9, 0)


def select_best_ip():
QA_util_log_info('Selecting the Best Server IP of TDX')
def select_best_ip(fetch_method='tick', reget=0):
"""
可选择用哪种方法选择最优ip了,默认采用实际获取股票/期货的实际tick数据获取,
fetch_method=='code'代表获取服务器的code_list来计算耗时
在fetch_method==tick的时候,reget==1的话可以重新测试ip耗时
"""
if fetch_method == 'tick':
QA_util_log_info('Selecting the Best Server IP by tick data fetch')
else:
QA_util_log_info('Selecting the Best Server IP by code list fetch')

# 删除exclude ip
import json
Expand All @@ -117,15 +126,21 @@ def select_best_ip():
section='IPLIST', option='exclude', default_value=alist)

exclude_from_stock_ip_list(ipexclude)

ipdefault = qasetting.get_config(
section='IPLIST', option='default', default_value=default_ip)

ipdefault = eval(ipdefault) if isinstance(ipdefault, str) else ipdefault
ipdefault = dict(stock=dict(ip=None, port=None),
future=dict(ip=None, port=None))
if not reget:
ipdefault = qasetting.get_config(
section='IPLIST', option='default', default_value=default_ip)

ipdefault = eval(ipdefault) if isinstance(
ipdefault, str) else ipdefault
assert isinstance(ipdefault, dict)
# best_stock_ip
if ipdefault['stock']['ip'] == None:

best_stock_ip = get_ip_list_by_ping(stock_ip_list)
if fetch_method == 'code':
best_stock_ip = get_ip_list_by_ping(stock_ip_list)
else:
best_stock_ip = get_best_ip_by_real_data_fetch('stock')
else:
if ping(ipdefault['stock']['ip'], ipdefault['stock']['port'], 'stock') < datetime.timedelta(0, 1):
print('USING DEFAULT STOCK IP')
Expand All @@ -134,14 +149,19 @@ def select_best_ip():
print('DEFAULT STOCK IP is BAD, RETESTING')
best_stock_ip = get_ip_list_by_ping(stock_ip_list)
if ipdefault['future']['ip'] == None:
best_future_ip = get_ip_list_by_ping(future_ip_list, _type='future')
if fetch_method == 'code':
best_future_ip = get_ip_list_by_ping(
future_ip_list, _type='future')
elif fetch_method == 'tick':
best_future_ip = get_best_ip_by_real_data_fetch('future')
else:
if ping(ipdefault['future']['ip'], ipdefault['future']['port'], 'future') < datetime.timedelta(0, 1):
print('USING DEFAULT FUTURE IP')
best_future_ip = ipdefault['future']
else:
print('DEFAULT FUTURE IP {} is BAD, RETESTING'.format(ipdefault))
best_future_ip = get_ip_list_by_ping(future_ip_list, _type='future')
best_future_ip = get_ip_list_by_ping(
future_ip_list, _type='future')
ipbest = {'stock': best_stock_ip, 'future': best_future_ip}
qasetting.set_config(
section='IPLIST', option='default', default_value=ipbest)
Expand All @@ -155,56 +175,65 @@ def get_ip_list_by_ping(ip_list=[], _type='stock'):
best_ip = get_ip_list_by_multi_process_ping(ip_list, 1, _type)
return best_ip[0]


def get_best_ip_by_real_data_fetch(_type='stock'):
"""
用特定的数据获取函数测试数据获得的时间,从而选择下载数据最快的服务器ip
默认使用特定品种1min的方式的获取
"""
from QUANTAXIS.QAUtil.QADate import QA_util_today_str
import time
#找到前两天的有效交易日期
pre_trade_date=QA_util_get_real_date(QA_util_today_str())
pre_trade_date=QA_util_get_real_date(pre_trade_date)

# 找到前两天的有效交易日期
pre_trade_date = QA_util_get_real_date(QA_util_today_str())
pre_trade_date = QA_util_get_real_date(pre_trade_date)

# 某个函数获取的耗时测试
def get_stock_data_by_ip(ips):
start=time.time()
api = TdxHq_API()
start = time.time()
try:
QA_fetch_get_stock_transaction('000001',pre_trade_date,pre_trade_date,2,ips['ip'],ips['port'])
end=time.time()
return end-start
with api.connect(ips['ip'], ips['port'], time_out=0.7):
# 加个可以timeout的方法(但是期货好像不适用)
res = QA_fetch_get_stock_transaction(
'000001', pre_trade_date, pre_trade_date, 2, ips['ip'], ips['port'])
end = time.time()
return 9999 if res is None else end-start
except:
return 9999

def get_future_data_by_ip(ips):
start=time.time()
apix = TdxExHaq_API()
start = time.time()
try:
QA_fetch_get_future_transaction('RBL8',pre_trade_date,pre_trade_date,2,ips['ip'],ips['port'])
end=time.time()
return end-start
res = QA_fetch_get_future_transaction(
'RBL8', pre_trade_date, pre_trade_date, 2, ips['ip'], ips['port'])
end = time.time()
return 9999 if res is None else end-start
except:
return 9999

func,ip_list=0,0
if _type=='stock':
func,ip_list=get_stock_data_by_ip,stock_ip_list
func, ip_list = 0, 0
if _type == 'stock':
func, ip_list = get_stock_data_by_ip, stock_ip_list
else:
func,ip_list=get_future_data_by_ip,future_ip_list
func, ip_list = get_future_data_by_ip, future_ip_list
from pathos.multiprocessing import Pool
def multiMap(func,sequence):
res=[]
pool=Pool(4)

def multiMap(func, sequence):
res = []
pool = Pool(4)
for i in sequence:
res.append(pool.apply_async(func,(i,)))
res.append(pool.apply_async(func, (i,)))
pool.close()
pool.join()
return list(map(lambda x:x.get(),res))
res=multiMap(func,ip_list)
index=res.index(min(res))
return list(map(lambda x: x.get(), res))

res = multiMap(func, ip_list)
index = res.index(min(res))
return ip_list[index]


def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'):
''' 根据ping排序返回可用的ip列表
2019 03 31 取消参数filename
Expand Down Expand Up @@ -234,8 +263,8 @@ def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'):
results = [x[1] for x in sorted(results, key=lambda x: x[0])]
if _type:
# store the data as binary data stream
cache.set(_type, results, age=86400)
print('saving ip list to {} cache {}.'.format(_type, len(results)))
cache.set(_type, results, age=86400)
print('saving ip list to {} cache {}.'.format(_type, len(results)))
if len(results) > 0:
if n == 0 and len(results) > 0:
return results
Expand All @@ -245,6 +274,7 @@ def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'):
print('ALL IP PING TIMEOUT!')
return [{'ip': None, 'port': None}]


global best_ip
best_ip = {
'stock': {
Expand All @@ -262,7 +292,7 @@ def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'):
def get_extensionmarket_ip(ip, port):
global best_ip
if ip is None and port is None and best_ip['future']['ip'] is None and best_ip['future']['port'] is None:
best_ip = select_best_ip()
best_ip = select_best_ip(reget=1)
ip = best_ip['future']['ip']
port = best_ip['future']['port']
elif ip is None and port is None and best_ip['future']['ip'] is not None and best_ip['future']['port'] is not None:
Expand All @@ -286,7 +316,7 @@ def get_mainmarket_ip(ip, port):

global best_ip
if ip is None and port is None and best_ip['stock']['ip'] is None and best_ip['stock']['port'] is None:
best_ip = select_best_ip()
best_ip = select_best_ip(reget=1)
ip = best_ip['stock']['ip']
port = best_ip['stock']['port']
elif ip is None and port is None and best_ip['stock']['ip'] is not None and best_ip['stock']['port'] is not None:
Expand All @@ -296,6 +326,7 @@ def get_mainmarket_ip(ip, port):
pass
return ip, port


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_security_bars(code, _type, lens, ip=None, port=None):
"""按bar长度推算数据
Expand Down Expand Up @@ -332,6 +363,7 @@ def QA_fetch_get_security_bars(code, _type, lens, ip=None, port=None):
else:
return None


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_day(code, start_date, end_date, if_fq='00', frequence='day', ip=None, port=None):
"""获取日线及以上级别的数据
Expand Down Expand Up @@ -408,6 +440,7 @@ def QA_fetch_get_stock_day(code, start_date, end_date, if_fq='00', frequence='da
else:
print(e)


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_min(code, start, end, frequence='1min', ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -448,6 +481,7 @@ def QA_fetch_get_stock_min(code, start, end, frequence='1min', ip=None, port=Non
type=type_).set_index('datetime', drop=False, inplace=False)[start:end]
return data.assign(datetime=data['datetime'].apply(lambda x: str(x)))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_latest(code, frequence='day', ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -485,6 +519,7 @@ def QA_fetch_get_stock_latest(code, frequence='day', ip=None, port=None):
.set_index('date', drop=False) \
.drop(['year', 'month', 'day', 'hour', 'minute', 'datetime'], axis=1)


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_realtime(code=['000001', '000002'], ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -512,6 +547,7 @@ def QA_fetch_get_stock_realtime(code=['000001', '000002'], ip=None, port=None):
'ask_vol4', 'bid4', 'bid_vol4', 'ask5', 'ask_vol5', 'bid5', 'bid_vol5']]
return data.set_index(['datetime', 'code'])


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_depth_market_data(code=['000001', '000002'], ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -631,6 +667,7 @@ def for_sh(code):
else:
return 'undefined'


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_list(type_='stock', ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -668,6 +705,7 @@ def QA_fetch_get_stock_list(type_='stock', ip=None, port=None):
# .assign(szm=data['name'].apply(lambda x: ''.join([y[0] for y in lazy_pinyin(x)])))\
# .assign(quanpin=data['name'].apply(lambda x: ''.join(lazy_pinyin(x))))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_index_list(ip=None, port=None):
"""获取指数列表
Expand Down Expand Up @@ -696,6 +734,7 @@ def QA_fetch_get_index_list(ip=None, port=None):
return pd.concat([sz, sh]).query('sec=="index_cn"').sort_index().assign(
name=data['name'].apply(lambda x: str(x)[0:6]))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_bond_list(ip=None, port=None):
"""bond
Expand All @@ -719,6 +758,7 @@ def QA_fetch_get_bond_list(ip=None, port=None):
return pd.concat([sz, sh]).query('sec=="bond_cn"').sort_index().assign(
name=data['name'].apply(lambda x: str(x)[0:6]))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_bond_day(code, start_date, end_date, frequence='day', ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -768,6 +808,7 @@ def QA_fetch_get_bond_day(code, start_date, end_date, frequence='day', ip=None,
'minute', 'datetime'], axis=1)[start_date:end_date]
return data.assign(date=data['date'].apply(lambda x: str(x)[0:10]))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_index_day(code, start_date, end_date, frequence='day', ip=None, port=None):
"""指数日线
Expand Down Expand Up @@ -822,6 +863,7 @@ def QA_fetch_get_index_day(code, start_date, end_date, frequence='day', ip=None,
'minute', 'datetime'], axis=1)[start_date:end_date]
return data.assign(date=data['date'].apply(lambda x: str(x)[0:10]))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_index_min(code, start, end, frequence='1min', ip=None, port=None):
'指数分钟线'
Expand Down Expand Up @@ -871,6 +913,7 @@ def QA_fetch_get_index_min(code, start, end, frequence='1min', ip=None, port=Non
# data
return data.assign(datetime=data['datetime'].apply(lambda x: str(x)))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_index_latest(code, frequence='day', ip=None, port=None):
ip, port = get_mainmarket_ip(ip, port)
Expand Down Expand Up @@ -902,9 +945,11 @@ def QA_fetch_get_index_latest(code, frequence='day', ip=None, port=None):
data = []
for item in code:
if str(item)[0] in ['5', '1']: # ETF
data.append(api.to_df(api.get_security_bars(frequence, 1 if str(item)[0] in ['0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item))
data.append(api.to_df(api.get_security_bars(frequence, 1 if str(item)[0] in [
'0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item))
else:
data.append(api.to_df(api.get_index_bars(frequence, 1 if str(item)[0] in ['0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item))
data.append(api.to_df(api.get_index_bars(frequence, 1 if str(item)[0] in [
'0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item))
data = pd.concat(data, axis=0)
return data \
.assign(date=pd.to_datetime(data['datetime']
Expand Down Expand Up @@ -937,6 +982,7 @@ def __QA_fetch_get_stock_transaction(code, day, retry, api):
.assign(code=str(code)).assign(order=range(len(data_.index))).set_index('datetime', drop=False,
inplace=False)


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_transaction(code, start, end, retry=2, ip=None, port=None):
'''
Expand Down Expand Up @@ -979,6 +1025,7 @@ def QA_fetch_get_stock_transaction(code, start, end, retry=2, ip=None, port=None
else:
return None


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_transaction_realtime(code, ip=None, port=None):
'实时分笔成交 包含集合竞价 buyorsell 1--sell 0--buy 2--盘前'
Expand All @@ -1000,6 +1047,7 @@ def QA_fetch_get_stock_transaction_realtime(code, ip=None, port=None):
except:
return None


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_xdxr(code, ip=None, port=None):
'除权除息'
Expand All @@ -1026,6 +1074,7 @@ def QA_fetch_get_stock_xdxr(code, ip=None, port=None):
else:
return None


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_info(code, ip=None, port=None):
'股票基本信息'
Expand All @@ -1035,6 +1084,7 @@ def QA_fetch_get_stock_info(code, ip=None, port=None):
with api.connect(ip, port):
return api.to_df(api.get_finance_info(market_code, code))


@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def QA_fetch_get_stock_block(ip=None, port=None):
'板块数据'
Expand Down

0 comments on commit 663af33

Please sign in to comment.