diff --git a/SmartApi/CONSTANTS.py b/SmartApi/CONSTANTS.py new file mode 100644 index 00000000..b688ede2 --- /dev/null +++ b/SmartApi/CONSTANTS.py @@ -0,0 +1,70 @@ +########################################################## smartWebSocketV2.py constants ########################################################## + + +ROOT_URI = "ws://smartapisocket.angelone.in/smart-stream" +HEART_BEAT_MESSAGE = "ping" +HEAR_BEAT_INTERVAL = 30 +LITTLE_ENDIAN_BYTE_ORDER = "<" +RESUBSCRIBE_FLAG = False +# HB_THREAD_FLAG = True +MAX_RETRY_ATTEMPT = 1 + +# Available Actions +SUBSCRIBE_ACTION = 1 +UNSUBSCRIBE_ACTION = 0 + +# Possible Subscription Mode +LTP_MODE = 1 +QUOTE = 2 +SNAP_QUOTE = 3 + +# Exchange Type +NSE_CM = 1 +NSE_FO = 2 +BSE_CM = 3 +BSE_FO = 4 +MCX_FO = 5 +NCX_FO = 7 +CDE_FO = 13 + +# Subscription Mode Map +SUBSCRIPTION_MODE_MAP = { + 1: "LTP", + 2: "QUOTE", + 3: "SNAP_QUOTE" +} + + +########################################################## smartConnect.py constants ########################################################## +_rootUrl="https://apiconnect.angelbroking.com" #prod endpoint +_login_url="https://apiconnect.angelbroking.com" #prod endpoint + +_default_timeout = 7 # In seconds + +_routes = { + "api.login":"/rest/auth/angelbroking/user/v1/loginByPassword", + "api.logout":"/rest/secure/angelbroking/user/v1/logout", + "api.token": "/rest/auth/angelbroking/jwt/v1/generateTokens", + "api.refresh": "/rest/auth/angelbroking/jwt/v1/generateTokens", + "api.user.profile": "/rest/secure/angelbroking/user/v1/getProfile", + + "api.order.place": "/rest/secure/angelbroking/order/v1/placeOrder", + "api.order.modify": "/rest/secure/angelbroking/order/v1/modifyOrder", + "api.order.cancel": "/rest/secure/angelbroking/order/v1/cancelOrder", + "api.order.book":"/rest/secure/angelbroking/order/v1/getOrderBook", + + "api.ltp.data": "/rest/secure/angelbroking/order/v1/getLtpData", + "api.trade.book": "/rest/secure/angelbroking/order/v1/getTradeBook", + "api.rms.limit": "/rest/secure/angelbroking/user/v1/getRMS", + "api.holding": "/rest/secure/angelbroking/portfolio/v1/getHolding", + "api.position": "/rest/secure/angelbroking/order/v1/getPosition", + "api.convert.position": "/rest/secure/angelbroking/order/v1/convertPosition", + + "api.gtt.create":"/gtt-service/rest/secure/angelbroking/gtt/v1/createRule", + "api.gtt.modify":"/gtt-service/rest/secure/angelbroking/gtt/v1/modifyRule", + "api.gtt.cancel":"/gtt-service/rest/secure/angelbroking/gtt/v1/cancelRule", + "api.gtt.details":"/rest/secure/angelbroking/gtt/v1/ruleDetails", + "api.gtt.list":"/rest/secure/angelbroking/gtt/v1/ruleList", + + "api.candle.data":"/rest/secure/angelbroking/historical/v1/getCandleData" +} diff --git a/SmartApi/__init__.py b/SmartApi/__init__.py index 720c8164..4f491061 100644 --- a/SmartApi/__init__.py +++ b/SmartApi/__init__.py @@ -1,11 +1,11 @@ -from __future__ import unicode_literals,absolute_import - -from smartapi.smartConnect import SmartConnect -# from smartapi.webSocket import WebSocket -from smartapi.smartApiWebsocket import SmartWebSocket - -__all__ = ["SmartConnect","SmartWebSocket"] - - - - +from __future__ import unicode_literals,absolute_import + +from SmartApi.smartConnect import SmartConnect +# from SmartApi.webSocket import WebSocket +from SmartApi.SmartApiWebsocket import SmartWebSocket + +__all__ = ["SmartConnect","SmartWebSocket"] + +from SmartApi.smartWebSocketV2 import SmartWebSocketV2 + + diff --git a/SmartApi/smartConnect.py b/SmartApi/smartConnect.py index 7ca26e42..3a1e6882 100644 --- a/SmartApi/smartConnect.py +++ b/SmartApi/smartConnect.py @@ -1,408 +1,385 @@ -from six.moves.urllib.parse import urljoin -import sys -import csv -import json -import dateutil.parser -import hashlib -import logging -import datetime -import smartapi.smartExceptions as ex -import requests -from requests import get -import re, uuid -import socket -import platform -from smartapi.version import __version__, __title__ - -log = logging.getLogger(__name__) -#user_sys=platform.system() -#print("the system",user_sys) - -class SmartConnect(object): - #_rootUrl = "https://openapisuat.angelbroking.com" - _rootUrl="https://apiconnect.angelbroking.com" #prod endpoint - #_login_url ="https://smartapi.angelbroking.com/login" - _login_url="https://smartapi.angelbroking.com/publisher-login" #prod endpoint - _default_timeout = 7 # In seconds - - _routes = { - "api.login":"/rest/auth/angelbroking/user/v1/loginByPassword", - "api.logout":"/rest/secure/angelbroking/user/v1/logout", - "api.token": "/rest/auth/angelbroking/jwt/v1/generateTokens", - "api.refresh": "/rest/auth/angelbroking/jwt/v1/generateTokens", - "api.user.profile": "/rest/secure/angelbroking/user/v1/getProfile", - - "api.order.place": "/rest/secure/angelbroking/order/v1/placeOrder", - "api.order.modify": "/rest/secure/angelbroking/order/v1/modifyOrder", - "api.order.cancel": "/rest/secure/angelbroking/order/v1/cancelOrder", - "api.order.book":"/rest/secure/angelbroking/order/v1/getOrderBook", - - "api.ltp.data": "/rest/secure/angelbroking/order/v1/getLtpData", - "api.trade.book": "/rest/secure/angelbroking/order/v1/getTradeBook", - "api.rms.limit": "/rest/secure/angelbroking/user/v1/getRMS", - "api.holding": "/rest/secure/angelbroking/portfolio/v1/getHolding", - "api.position": "/rest/secure/angelbroking/order/v1/getPosition", - "api.convert.position": "/rest/secure/angelbroking/order/v1/convertPosition", - - "api.gtt.create":"/gtt-service/rest/secure/angelbroking/gtt/v1/createRule", - "api.gtt.modify":"/gtt-service/rest/secure/angelbroking/gtt/v1/modifyRule", - "api.gtt.cancel":"/gtt-service/rest/secure/angelbroking/gtt/v1/cancelRule", - "api.gtt.details":"/rest/secure/angelbroking/gtt/v1/ruleDetails", - "api.gtt.list":"/rest/secure/angelbroking/gtt/v1/ruleList", - - "api.candle.data":"/rest/secure/angelbroking/historical/v1/getCandleData" - } - - - try: - clientPublicIp= " " + get('https://api.ipify.org').text - if " " in clientPublicIp: - clientPublicIp=clientPublicIp.replace(" ","") - hostname = socket.gethostname() - clientLocalIp=socket.gethostbyname(hostname) - except Exception as e: - print("Exception while retriving IP Address,using local host IP address",e) - finally: - clientPublicIp="106.193.147.98" - clientLocalIp="127.0.0.1" - clientMacAddress=':'.join(re.findall('..', '%012x' % uuid.getnode())) - accept = "application/json" - userType = "USER" - sourceID = "WEB" - - - def __init__(self, api_key=None, access_token=None, refresh_token=None,feed_token=None, userId=None, root=None, debug=False, timeout=None, proxies=None, pool=None, disable_ssl=False,accept=None,userType=None,sourceID=None,Authorization=None,clientPublicIP=None,clientMacAddress=None,clientLocalIP=None,privateKey=None): - self.debug = debug - self.api_key = api_key - self.session_expiry_hook = None - self.disable_ssl = disable_ssl - self.access_token = access_token - self.refresh_token = refresh_token - self.feed_token = feed_token - self.userId = userId - self.proxies = proxies if proxies else {} - self.root = root or self._rootUrl - self.timeout = timeout or self._default_timeout - self.Authorization= None - self.clientLocalIP=self.clientLocalIp - self.clientPublicIP=self.clientPublicIp - self.clientMacAddress=self.clientMacAddress - self.privateKey=api_key - self.accept=self.accept - self.userType=self.userType - self.sourceID=self.sourceID - - if pool: - self.reqsession = requests.Session() - reqadapter = requests.adapters.HTTPAdapter(**pool) - self.reqsession.mount("https://", reqadapter) - print("in pool") - else: - self.reqsession = requests - - # disable requests SSL warning - requests.packages.urllib3.disable_warnings() - def requestHeaders(self): - return{ - "Content-type":self.accept, - "X-ClientLocalIP": self.clientLocalIp, - "X-ClientPublicIP": self.clientPublicIp, - "X-MACAddress": self.clientMacAddress, - "Accept": self.accept, - "X-PrivateKey": self.privateKey, - "X-UserType": self.userType, - "X-SourceID": self.sourceID - } - - def setSessionExpiryHook(self, method): - if not callable(method): - raise TypeError("Invalid input type. Only functions are accepted.") - self.session_expiry_hook = method - - def getUserId(): - return userId - - def setUserId(self,id): - self.userId=id - - def setAccessToken(self, access_token): - - self.access_token = access_token - - def setRefreshToken(self, refresh_token): - - self.refresh_token = refresh_token - - def setFeedToken(self,feedToken): - - self.feed_token=feedToken - - def getfeedToken(self): - return self.feed_token - - - def login_url(self): - """Get the remote login url to which a user should be redirected to initiate the login flow.""" - return "%s?api_key=%s" % (self._login_url, self.api_key) - - def _request(self, route, method, parameters=None): - """Make an HTTP request.""" - params = parameters.copy() if parameters else {} - - uri =self._routes[route].format(**params) - url = urljoin(self.root, uri) - - - # Custom headers - headers = self.requestHeaders() - - if self.access_token: - # set authorization header - - auth_header = self.access_token - headers["Authorization"] = "Bearer {}".format(auth_header) - - if self.debug: - log.debug("Request: {method} {url} {params} {headers}".format(method=method, url=url, params=params, headers=headers)) - - try: - r = requests.request(method, - url, - data=json.dumps(params) if method in ["POST", "PUT"] else None, - params=json.dumps(params) if method in ["GET", "DELETE"] else None, - headers=headers, - verify=not self.disable_ssl, - allow_redirects=True, - timeout=self.timeout, - proxies=self.proxies) - - except Exception as e: - raise e - - if self.debug: - log.debug("Response: {code} {content}".format(code=r.status_code, content=r.content)) - - # Validate the content type. - if "json" in headers["Content-type"]: - try: - data = json.loads(r.content.decode("utf8")) - - except ValueError: - raise ex.DataException("Couldn't parse the JSON response received from the server: {content}".format( - content=r.content)) - - # api error - if data.get("error_type"): - # Call session hook if its registered and TokenException is raised - if self.session_expiry_hook and r.status_code == 403 and data["error_type"] == "TokenException": - self.session_expiry_hook() - - # native errors - exp = getattr(ex, data["error_type"], ex.GeneralException) - raise exp(data["message"], code=r.status_code) - - return data - elif "csv" in headers["Content-type"]: - return r.content - else: - raise ex.DataException("Unknown Content-type ({content_type}) with response: ({content})".format( - content_type=headers["Content-type"], - content=r.content)) - - def _deleteRequest(self, route, params=None): - """Alias for sending a DELETE request.""" - return self._request(route, "DELETE", params) - def _putRequest(self, route, params=None): - """Alias for sending a PUT request.""" - return self._request(route, "PUT", params) - def _postRequest(self, route, params=None): - """Alias for sending a POST request.""" - return self._request(route, "POST", params) - def _getRequest(self, route, params=None): - """Alias for sending a GET request.""" - return self._request(route, "GET", params) - - def generateSession(self,clientCode,password,totp): - - params={"clientcode":clientCode,"password":password,"totp":totp} - loginResultObject=self._postRequest("api.login",params) - - if loginResultObject['status']==True: - jwtToken=loginResultObject['data']['jwtToken'] - self.setAccessToken(jwtToken) - refreshToken=loginResultObject['data']['refreshToken'] - feedToken=loginResultObject['data']['feedToken'] - self.setRefreshToken(refreshToken) - self.setFeedToken(feedToken) - user=self.getProfile(refreshToken) - - id=user['data']['clientcode'] - #id='D88311' - self.setUserId(id) - user['data']['jwtToken']="Bearer "+jwtToken - user['data']['refreshToken']=refreshToken - - - return user - else: - return loginResultObject - def terminateSession(self,clientCode): - logoutResponseObject=self._postRequest("api.logout",{"clientcode":clientCode}) - return logoutResponseObject - - def generateToken(self,refresh_token): - response=self._postRequest('api.token',{"refreshToken":refresh_token}) - jwtToken=response['data']['jwtToken'] - feedToken=response['data']['feedToken'] - self.setFeedToken(feedToken) - self.setAccessToken(jwtToken) - - return response - - def renewAccessToken(self): - response =self._postRequest('api.refresh', { - "jwtToken": self.access_token, - "refreshToken": self.refresh_token, - - }) - - tokenSet={} - - if "jwtToken" in response: - tokenSet['jwtToken']=response['data']['jwtToken'] - tokenSet['clientcode']=self. userId - tokenSet['refreshToken']=response['data']["refreshToken"] - - return tokenSet - - def getProfile(self,refreshToken): - user=self._getRequest("api.user.profile",{"refreshToken":refreshToken}) - return user - - def placeOrder(self,orderparams): - - params=orderparams - - for k in list(params.keys()): - if params[k] is None : - del(params[k]) - - orderResponse= self._postRequest("api.order.place", params)['data']['orderid'] - - return orderResponse - - def modifyOrder(self,orderparams): - params = orderparams - - for k in list(params.keys()): - if params[k] is None: - del(params[k]) - - orderResponse= self._postRequest("api.order.modify", params) - return orderResponse - - def cancelOrder(self, order_id,variety): - orderResponse= self._postRequest("api.order.cancel", {"variety": variety,"orderid": order_id}) - return orderResponse - - def ltpData(self,exchange,tradingsymbol,symboltoken): - params={ - "exchange":exchange, - "tradingsymbol":tradingsymbol, - "symboltoken":symboltoken - } - ltpDataResponse= self._postRequest("api.ltp.data",params) - return ltpDataResponse - - def orderBook(self): - orderBookResponse=self._getRequest("api.order.book") - return orderBookResponse - - - def tradeBook(self): - tradeBookResponse=self._getRequest("api.trade.book") - return tradeBookResponse - - def rmsLimit(self): - rmsLimitResponse= self._getRequest("api.rms.limit") - return rmsLimitResponse - - def position(self): - positionResponse= self._getRequest("api.position") - return positionResponse - - def holding(self): - holdingResponse= self._getRequest("api.holding") - return holdingResponse - - def convertPosition(self,positionParams): - params=positionParams - for k in list(params.keys()): - if params[k] is None: - del(params[k]) - convertPositionResponse= self._postRequest("api.convert.position",params) - - return convertPositionResponse - - def gttCreateRule(self,createRuleParams): - params=createRuleParams - for k in list(params.keys()): - if params[k] is None: - del(params[k]) - - createGttRuleResponse=self._postRequest("api.gtt.create",params) - #print(createGttRuleResponse) - return createGttRuleResponse['data']['id'] - - def gttModifyRule(self,modifyRuleParams): - params=modifyRuleParams - for k in list(params.keys()): - if params[k] is None: - del(params[k]) - modifyGttRuleResponse=self._postRequest("api.gtt.modify",params) - #print(modifyGttRuleResponse) - return modifyGttRuleResponse['data']['id'] - - def gttCancelRule(self,gttCancelParams): - params=gttCancelParams - for k in list(params.keys()): - if params[k] is None: - del(params[k]) - - #print(params) - cancelGttRuleResponse=self._postRequest("api.gtt.cancel",params) - #print(cancelGttRuleResponse) - return cancelGttRuleResponse - - def gttDetails(self,id): - params={ - "id":id - } - gttDetailsResponse=self._postRequest("api.gtt.details",params) - return gttDetailsResponse - - def gttLists(self,status,page,count): - if type(status)== list: - params={ - "status":status, - "page":page, - "count":count - } - gttListResponse=self._postRequest("api.gtt.list",params) - #print(gttListResponse) - return gttListResponse - else: - message="The status param is entered as" +str(type(status))+". Please enter status param as a list i.e., status=['CANCELLED']" - return message - - def getCandleData(self,historicDataParams): - params=historicDataParams - for k in list(params.keys()): - if params[k] is None: - del(params[k]) - getCandleDataResponse=self._postRequest("api.candle.data",historicDataParams) - return getCandleDataResponse - - def _user_agent(self): - return (__title__ + "-python/").capitalize() + __version__ - +from six.moves.urllib.parse import urljoin +import sys +import csv +import json +import dateutil.parser +import hashlib +import logging +import datetime +import SmartApi.smartExceptions as ex +import requests +from requests import get +import re, uuid +import socket +import platform +from SmartApi.version import __version__, __title__ +from SmartApi import CONSTANTS +from logzero import logger + + + +log = logging.getLogger(__name__) + +class SmartConnect(object): + _rootUrl=CONSTANTS._rootUrl #prod endpoint + _login_url=CONSTANTS._login_url #prod endpoint + + _default_timeout = CONSTANTS._default_timeout # In seconds + + _routes = CONSTANTS._routes + + + try: + clientPublicIp= " " + get('https://api.ipify.org').text + if " " in clientPublicIp: + clientPublicIp=clientPublicIp.replace(" ","") + hostname = socket.gethostname() + clientLocalIp=socket.gethostbyname(hostname) + except Exception as e: + logger.error(f"Exception while retriving IP Address,using local host IP address",e) + finally: + clientPublicIp="106.193.147.98" + clientLocalIp="127.0.0.1" + clientMacAddress=':'.join(re.findall('..', '%012x' % uuid.getnode())) + accept = "application/json" + userType = "USER" + sourceID = "WEB" + + + def __init__(self, api_key=None, access_token=None, refresh_token=None,feed_token=None, userId=None, root=None, debug=False, timeout=None, proxies=None, pool=None, disable_ssl=False,accept=None,userType=None,sourceID=None,Authorization=None,clientPublicIP=None,clientMacAddress=None,clientLocalIP=None,privateKey=None): + self.debug = debug + self.api_key = api_key + self.session_expiry_hook = None + self.disable_ssl = disable_ssl + self.access_token = access_token + self.refresh_token = refresh_token + self.feed_token = feed_token + self.userId = userId + self.proxies = proxies if proxies else {} + self.root = root or self._rootUrl + self.timeout = timeout or self._default_timeout + self.Authorization= None + self.clientLocalIP=self.clientLocalIp + self.clientPublicIP=self.clientPublicIp + self.clientMacAddress=self.clientMacAddress + self.privateKey=api_key + self.accept=self.accept + self.userType=self.userType + self.sourceID=self.sourceID + + if pool: + self.reqsession = requests.Session() + reqadapter = requests.adapters.HTTPAdapter(**pool) + self.reqsession.mount("https://", reqadapter) + logger.info(f"in pool") + else: + self.reqsession = requests + + # disable requests SSL warning + requests.packages.urllib3.disable_warnings() + def requestHeaders(self): + return{ + "Content-type":self.accept, + "X-ClientLocalIP": self.clientLocalIp, + "X-ClientPublicIP": self.clientPublicIp, + "X-MACAddress": self.clientMacAddress, + "Accept": self.accept, + "X-PrivateKey": self.privateKey, + "X-UserType": self.userType, + "X-SourceID": self.sourceID + } + + def setSessionExpiryHook(self, method): + if not callable(method): + raise TypeError("Invalid input type. Only functions are accepted.") + self.session_expiry_hook = method + + def getUserId(): + return userId + + def setUserId(self,id): + self.userId=id + + def setAccessToken(self, access_token): + + self.access_token = access_token + + def setRefreshToken(self, refresh_token): + + self.refresh_token = refresh_token + + def setFeedToken(self,feedToken): + + self.feed_token=feedToken + + def getfeedToken(self): + return self.feed_token + + + def login_url(self): + """Get the remote login url to which a user should be redirected to initiate the login flow.""" + return "%s?api_key=%s" % (self._login_url, self.api_key) + + def _request(self, route, method, parameters=None): + """Make an HTTP request.""" + params = parameters.copy() if parameters else {} + + uri =self._routes[route].format(**params) + url = urljoin(self.root, uri) + + + # Custom headers + headers = self.requestHeaders() + + if self.access_token: + # set authorization header + + auth_header = self.access_token + headers["Authorization"] = "Bearer {}".format(auth_header) + + if self.debug: + log.debug("Request: {method} {url} {params} {headers}".format(method=method, url=url, params=params, headers=headers)) + + try: + r = requests.request(method, + url, + data=json.dumps(params) if method in ["POST", "PUT"] else None, + params=json.dumps(params) if method in ["GET", "DELETE"] else None, + headers=headers, + verify=not self.disable_ssl, + allow_redirects=True, + timeout=self.timeout, + proxies=self.proxies) + + except Exception as e: + logger.error(f"Exception while HTTP Request",e) + raise e + + if self.debug: + log.debug("Response: {code} {content}".format(code=r.status_code, content=r.content)) + + # Validate the content type. + if "json" in headers["Content-type"]: + try: + data = json.loads(r.content.decode("utf8")) + + except ValueError: + logger.error(f"Couldn't parse the JSON response received from the server: {content}".format(content=r.content)) + raise ex.DataException("Couldn't parse the JSON response received from the server: {content}".format( + content=r.content)) + + # api error + if data.get("error_type"): + # Call session hook if its registered and TokenException is raised + if self.session_expiry_hook and r.status_code == 403 and data["error_type"] == "TokenException": + self.session_expiry_hook() + + # native errors + exp = getattr(ex, data["error_type"], ex.GeneralException) + raise exp(data["message"], code=r.status_code) + + return data + elif "csv" in headers["Content-type"]: + return r.content + else: + raise ex.DataException("Unknown Content-type ({content_type}) with response: ({content})".format( + content_type=headers["Content-type"], + content=r.content)) + + def _deleteRequest(self, route, params=None): + """Alias for sending a DELETE request.""" + return self._request(route, "DELETE", params) + def _putRequest(self, route, params=None): + """Alias for sending a PUT request.""" + return self._request(route, "PUT", params) + def _postRequest(self, route, params=None): + """Alias for sending a POST request.""" + return self._request(route, "POST", params) + def _getRequest(self, route, params=None): + """Alias for sending a GET request.""" + return self._request(route, "GET", params) + + def generateSession(self,clientCode,password,totp): + + params={"clientcode":clientCode,"password":password,"totp":totp} + loginResultObject=self._postRequest("api.login",params) + + if loginResultObject['status']==True: + jwtToken=loginResultObject['data']['jwtToken'] + self.setAccessToken(jwtToken) + refreshToken=loginResultObject['data']['refreshToken'] + feedToken=loginResultObject['data']['feedToken'] + self.setRefreshToken(refreshToken) + self.setFeedToken(feedToken) + user=self.getProfile(refreshToken) + + id=user['data']['clientcode'] + #id='D88311' + self.setUserId(id) + user['data']['jwtToken']="Bearer "+jwtToken + user['data']['refreshToken']=refreshToken + + + return user + else: + return loginResultObject + def terminateSession(self,clientCode): + logoutResponseObject=self._postRequest("api.logout",{"clientcode":clientCode}) + return logoutResponseObject + + def generateToken(self,refresh_token): + response=self._postRequest('api.token',{"refreshToken":refresh_token}) + jwtToken=response['data']['jwtToken'] + feedToken=response['data']['feedToken'] + self.setFeedToken(feedToken) + self.setAccessToken(jwtToken) + + return response + + def renewAccessToken(self): + response =self._postRequest('api.refresh', { + "jwtToken": self.access_token, + "refreshToken": self.refresh_token, + + }) + + tokenSet={} + + if "jwtToken" in response: + tokenSet['jwtToken']=response['data']['jwtToken'] + tokenSet['clientcode']=self. userId + tokenSet['refreshToken']=response['data']["refreshToken"] + + return tokenSet + + def getProfile(self,refreshToken): + user=self._getRequest("api.user.profile",{"refreshToken":refreshToken}) + return user + + def placeOrder(self,orderparams): + + params=orderparams + + for k in list(params.keys()): + if params[k] is None : + del(params[k]) + + orderResponse= self._postRequest("api.order.place", params) + + return orderResponse + + def modifyOrder(self,orderparams): + params = orderparams + + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + + orderResponse= self._postRequest("api.order.modify", params) + return orderResponse + + def cancelOrder(self, order_id,variety): + orderResponse= self._postRequest("api.order.cancel", {"variety": variety,"orderid": order_id}) + return orderResponse + + def ltpData(self,exchange,tradingsymbol,symboltoken): + params={ + "exchange":exchange, + "tradingsymbol":tradingsymbol, + "symboltoken":symboltoken + } + ltpDataResponse= self._postRequest("api.ltp.data",params) + return ltpDataResponse + + def orderBook(self): + orderBookResponse=self._getRequest("api.order.book") + return orderBookResponse + + + def tradeBook(self): + tradeBookResponse=self._getRequest("api.trade.book") + return tradeBookResponse + + def rmsLimit(self): + rmsLimitResponse= self._getRequest("api.rms.limit") + return rmsLimitResponse + + def position(self): + positionResponse= self._getRequest("api.position") + return positionResponse + + def holding(self): + holdingResponse= self._getRequest("api.holding") + return holdingResponse + + def convertPosition(self,positionParams): + params=positionParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + convertPositionResponse= self._postRequest("api.convert.position",params) + + return convertPositionResponse + + def gttCreateRule(self,createRuleParams): + params=createRuleParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + + createGttRuleResponse=self._postRequest("api.gtt.create",params) + logger.info(createGttRuleResponse) + return createGttRuleResponse['data']['id'] + + def gttModifyRule(self,modifyRuleParams): + params=modifyRuleParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + modifyGttRuleResponse=self._postRequest("api.gtt.modify",params) + logger.info(modifyGttRuleResponse) + return modifyGttRuleResponse['data']['id'] + + def gttCancelRule(self,gttCancelParams): + params=gttCancelParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + + logger.info(params) + cancelGttRuleResponse=self._postRequest("api.gtt.cancel",params) + logger.info(cancelGttRuleResponse) + return cancelGttRuleResponse + + def gttDetails(self,id): + params={ + "id":id + } + gttDetailsResponse=self._postRequest("api.gtt.details",params) + return gttDetailsResponse + + def gttLists(self,status,page,count): + if type(status)== list: + params={ + "status":status, + "page":page, + "count":count + } + gttListResponse=self._postRequest("api.gtt.list",params) + logger.info(gttListResponse) + return gttListResponse + else: + message="The status param is entered as" +str(type(status))+". Please enter status param as a list i.e., status=['CANCELLED']" + return message + + def getCandleData(self,historicDataParams): + params=historicDataParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + getCandleDataResponse=self._postRequest("api.candle.data",historicDataParams) + return getCandleDataResponse + + def _user_agent(self): + return (__title__ + "-python/").capitalize() + __version__ + diff --git a/SmartApi/smartWebSocketV2.py b/SmartApi/smartWebSocketV2.py new file mode 100644 index 00000000..3f5269df --- /dev/null +++ b/SmartApi/smartWebSocketV2.py @@ -0,0 +1,346 @@ +""" + Created on Monday Jan 31 2022 + @author: Abhijeet Bote + :copyright: (c) 2022 by Angel One Limited +""" + +import struct +import ssl +import json + +import websocket +from SmartApi import CONSTANTS +from logzero import logger + + +class SmartWebSocketV2(object): + """ + SmartAPI Web Socket version 2 + """ + #Constants + RESUBSCRIBE_FLAG = CONSTANTS.RESUBSCRIBE_FLAG + + wsapp = None + input_request_dict = {} + current_retry_attempt = 0 + + def __init__(self, auth_token, api_key, client_code, feed_token): + """ + Initialise the SmartWebSocketV2 instance + Parameters + ------ + auth_token: string + jwt auth token received from Login API + api_key: string + api key from Smart API account + client_code: string + angel one account id + feed_token: string + feed token received from Login API + """ + self.auth_token = auth_token + self.api_key = api_key + self.client_code = client_code + self.feed_token = feed_token + + if not self._sanity_check(): + raise Exception("Provide valid value for all the tokens") + + def _sanity_check(self): + return True + + + def _on_data(self, wsapp, data, data_type, continue_flag): + + if data_type == 2: + parsed_message = self._parse_binary_data(data) + self.on_data(wsapp, parsed_message) + else: + self.on_data(wsapp, data) + + def _on_open(self, wsapp): + + if self.RESUBSCRIBE_FLAG: + self.resubscribe() + else: + self.RESUBSCRIBE_FLAG = True + self.on_open(wsapp) + + def _on_pong(self, wsapp, data): + logger.info("In on pong function==> {}".format(data)) + + def _on_ping(self, wsapp, data): + logger.info("In on ping function==> {}".format(data)) + + def subscribe(self, correlation_id, mode, token_list): + """ + This Function subscribe the price data for the given token + Parameters + ------ + correlation_id: string + A 10 character alphanumeric ID client may provide which will be returned by the server in error response + to indicate which request generated error response. + Clients can use this optional ID for tracking purposes between request and corresponding error response. + mode: integer + It denotes the subscription type + possible values -> 1, 2 and 3 + 1 -> LTP + 2 -> Quote + 3 -> Snap Quote + token_list: list of dict + Sample Value -> + [ + { "exchangeType": 1, "tokens": ["10626", "5290"]}, + {"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]} + ] + exchangeType: integer + possible values -> + 1 -> nse_cm + 2 -> nse_fo + 3 -> bse_cm + 4 -> bse_fo + 5 -> mcx_fo + 7 -> ncx_fo + 13 -> cde_fo + tokens: list of string + """ + try: + request_data = { + "correlationID": correlation_id, + "action": CONSTANTS.SUBSCRIBE_ACTION, + "params": { + "mode": mode, + "tokenList": token_list + } + } + + if self.input_request_dict.get(mode, None) is None: + self.input_request_dict[mode] = {} + + for token in token_list: + if token['exchangeType'] in self.input_request_dict[mode]: + self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"]) + else: + self.input_request_dict[mode][token['exchangeType']] = token["tokens"] + + self.wsapp.send(json.dumps(request_data)) + self.RESUBSCRIBE_FLAG = True + except Exception as e: + raise e + + def unsubscribe(self, correlation_id, mode, token_list): + """ + This function unsubscribe the data for given token + Parameters + ------ + correlation_id: string + A 10 character alphanumeric ID client may provide which will be returned by the server in error response + to indicate which request generated error response. + Clients can use this optional ID for tracking purposes between request and corresponding error response. + mode: integer + It denotes the subscription type + possible values -> 1, 2 and 3 + 1 -> LTP + 2 -> Quote + 3 -> Snap Quote + token_list: list of dict + Sample Value -> + [ + { "exchangeType": 1, "tokens": ["10626", "5290"]}, + {"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]} + ] + exchangeType: integer + possible values -> + 1 -> nse_cm + 2 -> nse_fo + 3 -> bse_cm + 4 -> bse_fo + 5 -> mcx_fo + 7 -> ncx_fo + 13 -> cde_fo + tokens: list of string + """ + try: + request_data = { + "correlationID": correlation_id, + "action": CONSTANTS.UNSUBSCRIBE_ACTION, + "params": { + "mode": mode, + "tokenList": token_list + } + } + + self.input_request_dict.update(request_data) + self.input_request_dict.update(request_data) + self.wsapp.send(json.dumps(request_data)) + self.RESUBSCRIBE_FLAG = True + except Exception as e: + raise e + + def resubscribe(self): + try: + for key, val in self.input_request_dict.items(): + token_list = [] + for key1, val1 in val.items(): + temp_data = { + 'exchangeType': key1, + 'tokens': val1 + } + token_list.append(temp_data) + request_data = { + "action": CONSTANTS.SUBSCRIBE_ACTION, + "params": { + "mode": key, + "tokenList": token_list + } + } + self.wsapp.send(json.dumps(request_data)) + except Exception as e: + raise e + + def connect(self): + """ + Make the web socket connection with the server + """ + headers = { + "Authorization": self.auth_token, + "x-api-key": self.api_key, + "x-client-code": self.client_code, + "x-feed-token": self.feed_token + } + try: + self.wsapp = websocket.WebSocketApp(CONSTANTS.ROOT_URI, header=headers, on_open=self._on_open, + on_error=self._on_error, on_close=self._on_close, on_data=self._on_data, + on_ping=self._on_ping, on_pong=self._on_pong) + self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=CONSTANTS.HEAR_BEAT_INTERVAL, + ping_payload=CONSTANTS.HEART_BEAT_MESSAGE) + except Exception as e: + raise e + + def close_connection(self): + """ + Closes the connection + """ + self.RESUBSCRIBE_FLAG = False + self.wsapp.close() + + def send_heart_beat(self): + try: + self.wsapp.send(CONSTANTS.HEART_BEAT_MESSAGE) + except Exception as e: + raise e + + def _on_error(self, wsapp, error): + self.HB_THREAD_FLAG = False + self.RESUBSCRIBE_FLAG = True + if self.current_retry_attempt < CONSTANTS.MAX_RETRY_ATTEMPT: + logger.error("Attempting to resubscribe/reconnect...") + self.current_retry_attempt += 1 + self.connect() + + def _on_close(self, wsapp): + self.on_close(wsapp) + + def _parse_binary_data(self, binary_data): + parsed_data = { + "subscription_mode": self._unpack_data(binary_data, 0, 1, byte_format="B")[0], + "exchange_type": self._unpack_data(binary_data, 1, 2, byte_format="B")[0], + "token": SmartWebSocketV2._parse_token_value(binary_data[2:27]), + "sequence_number": self._unpack_data(binary_data, 27, 35, byte_format="q")[0], + "exchange_timestamp": self._unpack_data(binary_data, 35, 43, byte_format="q")[0], + "last_traded_price": self._unpack_data(binary_data, 43, 51, byte_format="q")[0] + } + try: + parsed_data["subscription_mode_val"] = CONSTANTS.SUBSCRIPTION_MODE_MAP.get(parsed_data["subscription_mode"]) + + if parsed_data["subscription_mode"] in [CONSTANTS.QUOTE, CONSTANTS.SNAP_QUOTE]: + parsed_data["last_traded_quantity"] = self._unpack_data(binary_data, 51, 59, byte_format="q")[0] + parsed_data["average_traded_price"] = self._unpack_data(binary_data, 59, 67, byte_format="q")[0] + parsed_data["volume_trade_for_the_day"] = self._unpack_data(binary_data, 67, 75, byte_format="q")[0] + parsed_data["total_buy_quantity"] = self._unpack_data(binary_data, 75, 83, byte_format="d")[0] + parsed_data["total_sell_quantity"] = self._unpack_data(binary_data, 83, 91, byte_format="d")[0] + parsed_data["open_price_of_the_day"] = self._unpack_data(binary_data, 91, 99, byte_format="q")[0] + parsed_data["high_price_of_the_day"] = self._unpack_data(binary_data, 99, 107, byte_format="q")[0] + parsed_data["low_price_of_the_day"] = self._unpack_data(binary_data, 107, 115, byte_format="q")[0] + parsed_data["closed_price"] = self._unpack_data(binary_data, 115, 123, byte_format="q")[0] + + if parsed_data["subscription_mode"] == CONSTANTS.SNAP_QUOTE: + parsed_data["last_traded_timestamp"] = self._unpack_data(binary_data, 123, 131, byte_format="q")[0] + parsed_data["open_interest"] = self._unpack_data(binary_data, 131, 139, byte_format="q")[0] + parsed_data["open_interest_change_percentage"] = \ + self._unpack_data(binary_data, 139, 147, byte_format="q")[0] + parsed_data["upper_circuit_limit"] = self._unpack_data(binary_data, 347, 355, byte_format="q")[0] + parsed_data["lower_circuit_limit"] = self._unpack_data(binary_data, 355, 363, byte_format="q")[0] + parsed_data["52_week_high_price"] = self._unpack_data(binary_data, 363, 371, byte_format="q")[0] + parsed_data["52_week_low_price"] = self._unpack_data(binary_data, 371, 379, byte_format="q")[0] + best_5_buy_and_sell_data = self._parse_best_5_buy_and_sell_data(binary_data[147:347]) + parsed_data["best_5_buy_data"] = best_5_buy_and_sell_data["best_5_buy_data"] + parsed_data["best_5_sell_data"] = best_5_buy_and_sell_data["best_5_sell_data"] + + return parsed_data + except Exception as e: + raise e + + def _unpack_data(self, binary_data, start, end, byte_format="I"): + """ + Unpack Binary Data to the integer according to the specified byte_format. + This function returns the tuple + """ + return struct.unpack(CONSTANTS.LITTLE_ENDIAN_BYTE_ORDER + byte_format, binary_data[start:end]) + + @staticmethod + def _parse_token_value(binary_packet): + token = "" + for i in range(len(binary_packet)): + if chr(binary_packet[i]) == '\x00': + return token + token += chr(binary_packet[i]) + return token + + def _parse_best_5_buy_and_sell_data(self, binary_data): + + def split_packets(binary_packets): + packets = [] + + i = 0 + while i < len(binary_packets): + packets.append(binary_packets[i: i+20]) + i += 20 + return packets + + best_5_buy_sell_packets = split_packets(binary_data) + + best_5_buy_data = [] + best_5_sell_data = [] + + for packet in best_5_buy_sell_packets: + each_data = { + "flag": self._unpack_data(packet, 0, 2, byte_format="H")[0], + "quantity": self._unpack_data(packet, 2, 10, byte_format="q")[0], + "price": self._unpack_data(packet, 10, 18, byte_format="q")[0], + "no of orders": self._unpack_data(packet, 18, 20, byte_format="H")[0] + } + + if each_data["flag"] == 0: + best_5_buy_data.append(each_data) + else: + best_5_sell_data.append(each_data) + + return { + "best_5_buy_data": best_5_buy_data, + "best_5_sell_data": best_5_sell_data + } + + + + def on_data(self, wsapp, data): + pass + + def on_close(self, wsapp): + pass + + def on_open(self, wsapp): + pass + + def on_error(self): + pass \ No newline at end of file diff --git a/SmartApi/webSocket.py b/SmartApi/webSocket.py index c98b1ea2..f59a654f 100644 --- a/SmartApi/webSocket.py +++ b/SmartApi/webSocket.py @@ -1,448 +1,448 @@ - -import six -import sys -import time -import json -import struct -import logging -import threading -import base64 -import zlib -from datetime import datetime -from twisted.internet import reactor, ssl -from twisted.python import log as twisted_log -from twisted.internet.protocol import ReconnectingClientFactory -from autobahn.twisted.websocket import WebSocketClientProtocol, \ - WebSocketClientFactory, connectWS - -log = logging.getLogger(__name__) - -class SmartSocketClientProtocol(WebSocketClientProtocol): - - def __init__(self, *args, **kwargs): - super(SmartSocketClientProtocol,self).__init__(*args,**kwargs) - - def onConnect(self, response): # noqa - """Called when WebSocket server connection was established""" - self.factory.ws = self - - if self.factory.on_connect: - self.factory.on_connect(self, response) - - def onOpen(self): - if self.factory.on_open: - self.factory.on_open(self) - - - - def onMessage(self, payload, is_binary): # noqa - """Called when text or binary message is received.""" - if self.factory.on_message: - self.factory.on_message(self, payload, is_binary) - - - def onClose(self, was_clean, code, reason): # noqa - """Called when connection is closed.""" - if not was_clean: - if self.factory.on_error: - self.factory.on_error(self, code, reason) - - if self.factory.on_close: - self.factory.on_close(self, code, reason) - - -class SmartSocketClientFactory(WebSocketClientFactory,ReconnectingClientFactory): - protocol = SmartSocketClientProtocol - - maxDelay = 5 - maxRetries = 10 - - _last_connection_time = None - - def __init__(self, *args, **kwargs): - """Initialize with default callback method values.""" - self.debug = False - self.ws = None - self.on_open = None - self.on_error = None - self.on_close = None - self.on_message = None - self.on_connect = None - self.on_reconnect = None - self.on_noreconnect = None - - - super(SmartSocketClientFactory, self).__init__(*args, **kwargs) - - def startedConnecting(self, connector): # noqa - """On connecting start or reconnection.""" - if not self._last_connection_time and self.debug: - log.debug("Start WebSocket connection.") - - self._last_connection_time = time.time() - - def clientConnectionFailed(self, connector, reason): # noqa - """On connection failure (When connect request fails)""" - if self.retries > 0: - print("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay)))) - - # on reconnect callback - if self.on_reconnect: - self.on_reconnect(self.retries) - - # Retry the connection - self.retry(connector) - self.send_noreconnect() - - def clientConnectionLost(self, connector, reason): # noqa - """On connection lost (When ongoing connection got disconnected).""" - if self.retries > 0: - # on reconnect callback - if self.on_reconnect: - self.on_reconnect(self.retries) - - # Retry the connection - self.retry(connector) - self.send_noreconnect() - - def send_noreconnect(self): - """Callback `no_reconnect` if max retries are exhausted.""" - if self.maxRetries is not None and (self.retries > self.maxRetries): - if self.debug: - log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries)) - - if self.on_noreconnect: - self.on_noreconnect() - -class WebSocket(object): - EXCHANGE_MAP = { - "nse": 1, - "nfo": 2, - "cds": 3, - "bse": 4, - "bfo": 5, - "bsecds": 6, - "mcx": 7, - "mcxsx": 8, - "indices": 9 - } - # Default connection timeout - CONNECT_TIMEOUT = 30 - # Default Reconnect max delay. - RECONNECT_MAX_DELAY = 60 - # Default reconnect attempts - RECONNECT_MAX_TRIES = 50 - - ROOT_URI='wss://wsfeeds.angelbroking.com/NestHtml5Mobile/socket/stream' - - # Flag to set if its first connect - _is_first_connect = True - - # Minimum delay which should be set between retries. User can't set less than this - _minimum_reconnect_max_delay = 5 - # Maximum number or retries user can set - _maximum_reconnect_max_tries = 300 - - feed_token=None - client_code=None - def __init__(self, FEED_TOKEN, CLIENT_CODE,debug=False, root=None,reconnect=True,reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,connect_timeout=CONNECT_TIMEOUT): - - - self.root = root or self.ROOT_URI - self.feed_token= FEED_TOKEN - self.client_code= CLIENT_CODE - - # Set max reconnect tries - if reconnect_max_tries > self._maximum_reconnect_max_tries: - log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format( - val=self._maximum_reconnect_max_tries)) - self.reconnect_max_tries = self._maximum_reconnect_max_tries - else: - self.reconnect_max_tries = reconnect_max_tries - - # Set max reconnect delay - if reconnect_max_delay < self._minimum_reconnect_max_delay: - log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format( - val=self._minimum_reconnect_max_delay)) - self.reconnect_max_delay = self._minimum_reconnect_max_delay - else: - self.reconnect_max_delay = reconnect_max_delay - - self.connect_timeout = connect_timeout - - # Debug enables logs - self.debug = debug - - # Placeholders for callbacks. - self.on_ticks = None - self.on_open = None - self.on_close = None - self.on_error = None - self.on_connect = None - self.on_message = None - self.on_reconnect = None - self.on_noreconnect = None - - - def _create_connection(self, url, **kwargs): - """Create a WebSocket client connection.""" - self.factory = SmartSocketClientFactory(url, **kwargs) - - # Alias for current websocket connection - self.ws = self.factory.ws - - self.factory.debug = self.debug - - # Register private callbacks - self.factory.on_open = self._on_open - self.factory.on_error = self._on_error - self.factory.on_close = self._on_close - self.factory.on_message = self._on_message - self.factory.on_connect = self._on_connect - self.factory.on_reconnect = self._on_reconnect - self.factory.on_noreconnect = self._on_noreconnect - - - self.factory.maxDelay = self.reconnect_max_delay - self.factory.maxRetries = self.reconnect_max_tries - - def connect(self, threaded=False, disable_ssl_verification=False, proxy=None): - #print("Connect") - self._create_connection(self.ROOT_URI) - - context_factory = None - #print(self.factory.isSecure,disable_ssl_verification) - if self.factory.isSecure and not disable_ssl_verification: - context_factory = ssl.ClientContextFactory() - #print("context_factory",context_factory) - connectWS(self.factory, contextFactory=context_factory, timeout=30) - - # Run in seperate thread of blocking - opts = {} - - # Run when reactor is not running - if not reactor.running: - if threaded: - #print("inside threaded") - # Signals are not allowed in non main thread by twisted so suppress it. - opts["installSignalHandlers"] = False - self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts) - self.websocket_thread.daemon = True - self.websocket_thread.start() - else: - reactor.run(**opts) - - - def is_connected(self): - #print("Check if WebSocket connection is established.") - if self.ws and self.ws.state == self.ws.STATE_OPEN: - return True - else: - return False - - def _close(self, code=None, reason=None): - #print("Close the WebSocket connection.") - if self.ws: - self.ws.sendClose(code, reason) - - def close(self, code=None, reason=None): - """Close the WebSocket connection.""" - self.stop_retry() - self._close(code, reason) - - def stop(self): - """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.""" - #print("stop") - - reactor.stop() - - def stop_retry(self): - """Stop auto retry when it is in progress.""" - if self.factory: - self.factory.stopTrying() - - def _on_reconnect(self, attempts_count): - if self.on_reconnect: - return self.on_reconnect(self, attempts_count) - - def _on_noreconnect(self): - if self.on_noreconnect: - return self.on_noreconnect(self) - - def websocket_connection(self): - if self.client_code == None or self.feed_token == None: - return "client_code or feed_token or task is missing" - - request={"task":"cn","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} - self.ws.sendMessage( - six.b(json.dumps(request)) - ) - #print(request) - - threading.Thread(target=self.heartBeat,daemon=True).start() - - def send_request(self,token,task): - if task in ("mw","sfi","dp"): - strwatchlistscrips = token #dynamic call - - try: - request={"task":task,"channel":strwatchlistscrips,"token":self.feed_token,"user":self.client_code,"acctid":self.client_code} - - self.ws.sendMessage( - six.b(json.dumps(request)) - ) - return True - except Exception as e: - self._close(reason="Error while request sending: {}".format(str(e))) - raise - else: - print("The task entered is invalid, Please enter correct task(mw,sfi,dp) ") - - def _on_connect(self, ws, response): - #print("-----_on_connect-------") - self.ws = ws - if self.on_connect: - - print(self.on_connect) - self.on_connect(self, response) - #self.websocket_connection - - def _on_close(self, ws, code, reason): - """Call `on_close` callback when connection is closed.""" - log.debug("Connection closed: {} - {}".format(code, str(reason))) - - if self.on_close: - self.on_close(self, code, reason) - - def _on_error(self, ws, code, reason): - """Call `on_error` callback when connection throws an error.""" - log.debug("Connection error: {} - {}".format(code, str(reason))) - - if self.on_error: - self.on_error(self, code, reason) - - - - def _on_message(self, ws, payload, is_binary): - """Call `on_message` callback when text message is received.""" - if self.on_message: - self.on_message(self, payload, is_binary) - - # If the message is binary, parse it and send it to the callback. - if self.on_ticks and is_binary and len(payload) > 4: - self.on_ticks(self, self._parse_binary(payload)) - - # Parse text messages - if not is_binary: - self._parse_text_message(payload) - - def _on_open(self, ws): - if not self._is_first_connect: - self.connect() - - self._is_first_connect = False - - if self.on_open: - return self.on_open(self) - - - def heartBeat(self): - while True: - try: - request={"task":"hb","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} - self.ws.sendMessage( - six.b(json.dumps(request)) - ) - - except: - print("HeartBeats Failed") - time.sleep(60) - - - def _parse_text_message(self, payload): - """Parse text message.""" - # Decode unicode data - if not six.PY2 and type(payload) == bytes: - payload = payload.decode("utf-8") - - data =base64.b64decode(payload) - - try: - data = bytes((zlib.decompress(data)).decode("utf-8"), 'utf-8') - data = json.loads(data.decode('utf8').replace("'", '"')) - data = json.loads(json.dumps(data, indent=4, sort_keys=True)) - except ValueError: - return - - self.on_ticks(self, data) - - def _parse_binary(self, bin): - """Parse binary data to a (list of) ticks structure.""" - packets = self._split_packets(bin) # split data to individual ticks packet - data = [] - - for packet in packets: - instrument_token = self._unpack_int(packet, 0, 4) - segment = instrument_token & 0xff # Retrive segment constant from instrument_token - - divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0 - - # All indices are not tradable - tradable = False if segment == self.EXCHANGE_MAP["indices"] else True - try: - last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48)) - except Exception: - last_trade_time = None - - try: - timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64)) - except Exception: - timestamp = None - - d["last_trade_time"] = last_trade_time - d["oi"] = self._unpack_int(packet, 48, 52) - d["oi_day_high"] = self._unpack_int(packet, 52, 56) - d["oi_day_low"] = self._unpack_int(packet, 56, 60) - d["timestamp"] = timestamp - - # Market depth entries. - depth = { - "buy": [], - "sell": [] - } - - # Compile the market depth lists. - for i, p in enumerate(range(64, len(packet), 12)): - depth["sell" if i >= 5 else "buy"].append({ - "quantity": self._unpack_int(packet, p, p + 4), - "price": self._unpack_int(packet, p + 4, p + 8) / divisor, - "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H") - }) - - d["depth"] = depth - - data.append(d) - - return data - - def _unpack_int(self, bin, start, end, byte_format="I"): - """Unpack binary data as unsgined interger.""" - return struct.unpack(">" + byte_format, bin[start:end])[0] - - def _split_packets(self, bin): - """Split the data to individual packets of ticks.""" - # Ignore heartbeat data. - if len(bin) < 2: - return [] - - number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H") - packets = [] - - j = 2 - for i in range(number_of_packets): - packet_length = self._unpack_int(bin, j, j + 2, byte_format="H") - packets.append(bin[j + 2: j + 2 + packet_length]) - j = j + 2 + packet_length - - return packets + +import six +import sys +import time +import json +import struct +import logging +import threading +import base64 +import zlib +from datetime import datetime +from logzero import logger +from twisted.internet import reactor, ssl +from twisted.python import log as twisted_log +from twisted.internet.protocol import ReconnectingClientFactory +from autobahn.twisted.websocket import WebSocketClientProtocol, \ + WebSocketClientFactory, connectWS + +log = logging.getLogger(__name__) + +class SmartSocketClientProtocol(WebSocketClientProtocol): + + def __init__(self, *args, **kwargs): + super(SmartSocketClientProtocol,self).__init__(*args,**kwargs) + + def onConnect(self, response): # noqa + """Called when WebSocket server connection was established""" + self.factory.ws = self + + if self.factory.on_connect: + self.factory.on_connect(self, response) + + def onOpen(self): + if self.factory.on_open: + self.factory.on_open(self) + + + + def onMessage(self, payload, is_binary): # noqa + """Called when text or binary message is received.""" + if self.factory.on_message: + self.factory.on_message(self, payload, is_binary) + + + def onClose(self, was_clean, code, reason): # noqa + """Called when connection is closed.""" + if not was_clean: + if self.factory.on_error: + self.factory.on_error(self, code, reason) + + if self.factory.on_close: + self.factory.on_close(self, code, reason) + + +class SmartSocketClientFactory(WebSocketClientFactory,ReconnectingClientFactory): + protocol = SmartSocketClientProtocol + + maxDelay = 5 + maxRetries = 10 + + _last_connection_time = None + + def __init__(self, *args, **kwargs): + """Initialize with default callback method values.""" + self.debug = False + self.ws = None + self.on_open = None + self.on_error = None + self.on_close = None + self.on_message = None + self.on_connect = None + self.on_reconnect = None + self.on_noreconnect = None + + + super(SmartSocketClientFactory, self).__init__(*args, **kwargs) + + def startedConnecting(self, connector): # noqa + """On connecting start or reconnection.""" + if not self._last_connection_time and self.debug: + log.debug("Start WebSocket connection.") + + self._last_connection_time = time.time() + + def clientConnectionFailed(self, connector, reason): # noqa + """On connection failure (When connect request fails)""" + if self.retries > 0: + logger.info("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay)))) + + # on reconnect callback + if self.on_reconnect: + self.on_reconnect(self.retries) + + # Retry the connection + self.retry(connector) + self.send_noreconnect() + + def clientConnectionLost(self, connector, reason): # noqa + """On connection lost (When ongoing connection got disconnected).""" + if self.retries > 0: + # on reconnect callback + if self.on_reconnect: + self.on_reconnect(self.retries) + + # Retry the connection + self.retry(connector) + self.send_noreconnect() + + def send_noreconnect(self): + """Callback `no_reconnect` if max retries are exhausted.""" + if self.maxRetries is not None and (self.retries > self.maxRetries): + if self.debug: + log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries)) + + if self.on_noreconnect: + self.on_noreconnect() + +class WebSocket(object): + EXCHANGE_MAP = { + "nse": 1, + "nfo": 2, + "cds": 3, + "bse": 4, + "bfo": 5, + "bsecds": 6, + "mcx": 7, + "mcxsx": 8, + "indices": 9 + } + # Default connection timeout + CONNECT_TIMEOUT = 30 + # Default Reconnect max delay. + RECONNECT_MAX_DELAY = 60 + # Default reconnect attempts + RECONNECT_MAX_TRIES = 50 + + ROOT_URI='wss://wsfeeds.angelbroking.com/NestHtml5Mobile/socket/stream' + + # Flag to set if its first connect + _is_first_connect = True + + # Minimum delay which should be set between retries. User can't set less than this + _minimum_reconnect_max_delay = 5 + # Maximum number or retries user can set + _maximum_reconnect_max_tries = 300 + + feed_token=None + client_code=None + def __init__(self, FEED_TOKEN, CLIENT_CODE,debug=False, root=None,reconnect=True,reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,connect_timeout=CONNECT_TIMEOUT): + + + self.root = root or self.ROOT_URI + self.feed_token= FEED_TOKEN + self.client_code= CLIENT_CODE + + # Set max reconnect tries + if reconnect_max_tries > self._maximum_reconnect_max_tries: + log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format( + val=self._maximum_reconnect_max_tries)) + self.reconnect_max_tries = self._maximum_reconnect_max_tries + else: + self.reconnect_max_tries = reconnect_max_tries + + # Set max reconnect delay + if reconnect_max_delay < self._minimum_reconnect_max_delay: + log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format( + val=self._minimum_reconnect_max_delay)) + self.reconnect_max_delay = self._minimum_reconnect_max_delay + else: + self.reconnect_max_delay = reconnect_max_delay + + self.connect_timeout = connect_timeout + + # Debug enables logs + self.debug = debug + + # Placeholders for callbacks. + self.on_ticks = None + self.on_open = None + self.on_close = None + self.on_error = None + self.on_connect = None + self.on_message = None + self.on_reconnect = None + self.on_noreconnect = None + + + def _create_connection(self, url, **kwargs): + """Create a WebSocket client connection.""" + self.factory = SmartSocketClientFactory(url, **kwargs) + + # Alias for current websocket connection + self.ws = self.factory.ws + + self.factory.debug = self.debug + + # Register private callbacks + self.factory.on_open = self._on_open + self.factory.on_error = self._on_error + self.factory.on_close = self._on_close + self.factory.on_message = self._on_message + self.factory.on_connect = self._on_connect + self.factory.on_reconnect = self._on_reconnect + self.factory.on_noreconnect = self._on_noreconnect + + + self.factory.maxDelay = self.reconnect_max_delay + self.factory.maxRetries = self.reconnect_max_tries + + def connect(self, threaded=False, disable_ssl_verification=False, proxy=None): + logger.info("Connect") + self._create_connection(self.ROOT_URI) + + context_factory = None + logger.info(self.factory.isSecure,disable_ssl_verification) + if self.factory.isSecure and not disable_ssl_verification: + context_factory = ssl.ClientContextFactory() + logger.info("context_factory",context_factory) + connectWS(self.factory, contextFactory=context_factory, timeout=30) + + # Run in seperate thread of blocking + opts = {} + + # Run when reactor is not running + if not reactor.running: + if threaded: + logger.info("inside threaded") + # Signals are not allowed in non main thread by twisted so suppress it. + opts["installSignalHandlers"] = False + self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts) + self.websocket_thread.daemon = True + self.websocket_thread.start() + else: + reactor.run(**opts) + + + def is_connected(self): + logger.info("Check if WebSocket connection is established.") + if self.ws and self.ws.state == self.ws.STATE_OPEN: + return True + else: + return False + + def _close(self, code=None, reason=None): + logger.info("Close the WebSocket connection.") + if self.ws: + self.ws.sendClose(code, reason) + + def close(self, code=None, reason=None): + """Close the WebSocket connection.""" + self.stop_retry() + self._close(code, reason) + + def stop(self): + """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.""" + logger.info("stop") + + reactor.stop() + + def stop_retry(self): + """Stop auto retry when it is in progress.""" + if self.factory: + self.factory.stopTrying() + + def _on_reconnect(self, attempts_count): + if self.on_reconnect: + return self.on_reconnect(self, attempts_count) + + def _on_noreconnect(self): + if self.on_noreconnect: + return self.on_noreconnect(self) + + def websocket_connection(self): + if self.client_code == None or self.feed_token == None: + return "client_code or feed_token or task is missing" + + request={"task":"cn","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + logger.info(request) + + threading.Thread(target=self.heartBeat,daemon=True).start() + + def send_request(self,token,task): + if task in ("mw","sfi","dp"): + strwatchlistscrips = token #dynamic call + + try: + request={"task":task,"channel":strwatchlistscrips,"token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + return True + except Exception as e: + self._close(reason="Error while request sending: {}".format(str(e))) + raise + else: + logger.info("The task entered is invalid, Please enter correct task(mw,sfi,dp) ") + + def _on_connect(self, ws, response): + logger.info("-----_on_connect-------") + self.ws = ws + if self.on_connect: + + logger.info(self.on_connect) + self.on_connect(self, response) + + def _on_close(self, ws, code, reason): + """Call `on_close` callback when connection is closed.""" + log.debug("Connection closed: {} - {}".format(code, str(reason))) + + if self.on_close: + self.on_close(self, code, reason) + + def _on_error(self, ws, code, reason): + """Call `on_error` callback when connection throws an error.""" + log.debug("Connection error: {} - {}".format(code, str(reason))) + + if self.on_error: + self.on_error(self, code, reason) + + + + def _on_message(self, ws, payload, is_binary): + """Call `on_message` callback when text message is received.""" + if self.on_message: + self.on_message(self, payload, is_binary) + + # If the message is binary, parse it and send it to the callback. + if self.on_ticks and is_binary and len(payload) > 4: + self.on_ticks(self, self._parse_binary(payload)) + + # Parse text messages + if not is_binary: + self._parse_text_message(payload) + + def _on_open(self, ws): + if not self._is_first_connect: + self.connect() + + self._is_first_connect = False + + if self.on_open: + return self.on_open(self) + + + def heartBeat(self): + while True: + try: + request={"task":"hb","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + + except: + logger.info("HeartBeats Failed") + time.sleep(60) + + + def _parse_text_message(self, payload): + """Parse text message.""" + # Decode unicode data + if not six.PY2 and type(payload) == bytes: + payload = payload.decode("utf-8") + + data =base64.b64decode(payload) + + try: + data = bytes((zlib.decompress(data)).decode("utf-8"), 'utf-8') + data = json.loads(data.decode('utf8').replace("'", '"')) + data = json.loads(json.dumps(data, indent=4, sort_keys=True)) + except ValueError: + return + + self.on_ticks(self, data) + + def _parse_binary(self, bin): + """Parse binary data to a (list of) ticks structure.""" + packets = self._split_packets(bin) # split data to individual ticks packet + data = [] + + for packet in packets: + instrument_token = self._unpack_int(packet, 0, 4) + segment = instrument_token & 0xff # Retrive segment constant from instrument_token + + divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0 + + # All indices are not tradable + tradable = False if segment == self.EXCHANGE_MAP["indices"] else True + try: + last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48)) + except Exception: + last_trade_time = None + + try: + timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64)) + except Exception: + timestamp = None + + d["last_trade_time"] = last_trade_time + d["oi"] = self._unpack_int(packet, 48, 52) + d["oi_day_high"] = self._unpack_int(packet, 52, 56) + d["oi_day_low"] = self._unpack_int(packet, 56, 60) + d["timestamp"] = timestamp + + # Market depth entries. + depth = { + "buy": [], + "sell": [] + } + + # Compile the market depth lists. + for i, p in enumerate(range(64, len(packet), 12)): + depth["sell" if i >= 5 else "buy"].append({ + "quantity": self._unpack_int(packet, p, p + 4), + "price": self._unpack_int(packet, p + 4, p + 8) / divisor, + "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H") + }) + + d["depth"] = depth + + data.append(d) + + return data + + def _unpack_int(self, bin, start, end, byte_format="I"): + """Unpack binary data as unsgined interger.""" + return struct.unpack(">" + byte_format, bin[start:end])[0] + + def _split_packets(self, bin): + """Split the data to individual packets of ticks.""" + # Ignore heartbeat data. + if len(bin) < 2: + return [] + + number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H") + packets = [] + + j = 2 + for i in range(number_of_packets): + packet_length = self._unpack_int(bin, j, j + 2, byte_format="H") + packets.append(bin[j + 2: j + 2 + packet_length]) + j = j + 2 + packet_length + + return packets \ No newline at end of file diff --git a/example/sample.py b/example/sample.py old mode 100644 new mode 100755 index 8143447d..c06698e0 --- a/example/sample.py +++ b/example/sample.py @@ -1,113 +1,196 @@ -# package import statement -from smartapi import SmartConnect #or from smartapi.smartConnect import SmartConnect -#import smartapi.smartExceptions(for smartExceptions) - -#create object of call -obj=SmartConnect(api_key="your api key") - -#login api call - -data = obj.generateSession("Your Client ID","Your Password","Your totp here") -refreshToken= data['data']['refreshToken'] - -#fetch the feedtoken -feedToken=obj.getfeedToken() - -#fetch User Profile -userProfile= obj.getProfile(refreshToken) -#place order -try: - orderparams = { - "variety": "NORMAL", - "tradingsymbol": "SBIN-EQ", - "symboltoken": "3045", - "transactiontype": "BUY", - "exchange": "NSE", - "ordertype": "LIMIT", - "producttype": "INTRADAY", - "duration": "DAY", - "price": "19500", - "squareoff": "0", - "stoploss": "0", - "quantity": "1" - } - orderId=obj.placeOrder(orderparams) - print("The order id is: {}".format(orderId)) -except Exception as e: - print("Order placement failed: {}".format(e.message)) -#gtt rule creation -try: - gttCreateParams={ - "tradingsymbol" : "SBIN-EQ", - "symboltoken" : "3045", - "exchange" : "NSE", - "producttype" : "MARGIN", - "transactiontype" : "BUY", - "price" : 100000, - "qty" : 10, - "disclosedqty": 10, - "triggerprice" : 200000, - "timeperiod" : 365 - } - rule_id=obj.gttCreateRule(gttCreateParams) - print("The GTT rule id is: {}".format(rule_id)) -except Exception as e: - print("GTT Rule creation failed: {}".format(e.message)) - -#gtt rule list -try: - status=["FORALL"] #should be a list - page=1 - count=10 - lists=obj.gttLists(status,page,count) -except Exception as e: - print("GTT Rule List failed: {}".format(e.message)) - -#Historic api -try: - historicParam={ - "exchange": "NSE", - "symboltoken": "3045", - "interval": "ONE_MINUTE", - "fromdate": "2021-02-08 09:00", - "todate": "2021-02-08 09:16" - } - obj.getCandleData(historicParam) -except Exception as e: - print("Historic Api failed: {}".format(e.message)) -#logout -try: - logout=obj.terminateSession('Your Client Id') - print("Logout Successfull") -except Exception as e: - print("Logout failed: {}".format(e.message)) - - - -## WebSocket -from smartapi import WebSocket - -FEED_TOKEN= "your feed token" -CLIENT_CODE="your client Id" -token="channel you want the information of" #"nse_cm|2885&nse_cm|1594&nse_cm|11536" -task="task" #"mw"|"sfi"|"dp" -ss = WebSocket(FEED_TOKEN, CLIENT_CODE) - -def on_tick(ws, tick): - print("Ticks: {}".format(tick)) - -def on_connect(ws, response): - ws.websocket_connection() # Websocket connection - ws.send_request(token,task) - -def on_close(ws, code, reason): - ws.stop() - -# Assign the callbacks. -ss.on_ticks = on_tick -ss.on_connect = on_connect -ss.on_close = on_close - -ss.connect() - - +# package import statement +from SmartApi import SmartConnect #or from smartapi.smartConnect import SmartConnect +from logzero import logger +# import SmartApi.smartExceptions #(for smartExceptions) + +apiKey = "" +clientId = "" +pin = "" +totp = "" +correlation_id = "" + + + +#create object of call +obj=SmartConnect(api_key=apiKey) + +#login api call + +data = obj.generateSession(clientId,pin,totp) + +authToken = data['data']['jwtToken'] +authToken = f'Bearer {authToken}' + +refreshToken= data['data']['refreshToken'] + +#fetch the feedtoken +feedToken=obj.getfeedToken() + +#fetch User Profile +userProfile= obj.getProfile(refreshToken) + + + +def error_json(correlation_id, code, msg): + errorJson = { + f"correlationID": correlation_id, + f"errorCode": code, + f"errorMessage": msg + } + return errorJson + + +# place order +try: + orderparams = { + "variety": "NORMAL", + "tradingsymbol": "SBIN-EQ", + "symboltoken": "3045", + "transactiontype": "BUY", + "exchange": "NSE", + "ordertype": "LIMIT", + "producttype": "INTRADAY", + "duration": "DAY", + "price": "19500", + "squareoff": "0", + "stoploss": "0", + "quantity": "1" + } + orderId=obj.placeOrder(orderparams) + logger.info("The order id is: {}".format(orderId)) +except TypeError: + code = "E1001" + msg = "Invalid Request Payload." + logger.error(error_json(correlation_id, code, msg)) +except Exception: + code = "E1002" + msg = "Invalid Request. Subscription Limit Exceeded." + logger.error(error_json(correlation_id, code, msg)) + +#gtt rule creation +try: + gttCreateParams={ + "tradingsymbol" : "SBIN-EQ", + "symboltoken" : "3045", + "exchange" : "NSE", + "producttype" : "MARGIN", + "transactiontype" : "BUY", + "price" : 100000, + "qty" : 10, + "disclosedqty": 10, + "triggerprice" : 200000, + "timeperiod" : 365 + } + rule_id=obj.gttCreateRule(gttCreateParams) + logger.info("The GTT rule id is: {}".format(rule_id)) +except TypeError: + code = "E1001" + msg = "Invalid Request Payload." + logger.error(error_json(correlation_id, code, msg)) +except Exception: + code = "E1002" + msg = "Invalid Request. Subscription Limit Exceeded." + logger.error(error_json(correlation_id, code, msg)) + +#gtt rule list +try: + status=["FORALL"] #should be a list + page=1 + count=10 + lists=obj.gttLists(status,page,count) + logger.info("The GTT List : {}".format(lists)) +except TypeError: + code = "E1001" + msg = "Invalid Request Payload." + logger.error(error_json(correlation_id, code, msg)) +except Exception: + code = "E1002" + msg = "Invalid Request. Subscription Limit Exceeded." + logger.error(error_json(correlation_id, code, msg)) + +#Historic api +try: + historicParam={ + "exchange": "NSE", + "symboltoken": "3045", + "interval": "ONE_MINUTE", + "fromdate": "2021-02-08 09:00", + "todate": "2021-02-08 09:16" + } + hist_param = obj.getCandleData(historicParam) + logger.info("The Historic Params : {}".format(hist_param)) +except TypeError: + code = "E1001" + msg = "Invalid Request Payload." + logger.error(error_json(correlation_id, code, msg)) +except Exception: + code = "E1002" + msg = "Invalid Request. Subscription Limit Exceeded." + logger.error(error_json(correlation_id, code, msg)) + +# logout +try: + logout=obj.terminateSession('Your Client Id') + logger.info("Logout Successfull") +except TypeError: + code = "E1001" + msg = "Invalid Request Payload." + logger.error(error_json(correlation_id, code, msg)) +except Exception: + code = "E1002" + msg = "Invalid Request. Subscription Limit Exceeded." + logger.error(error_json(correlation_id, code, msg)) + + + +from SmartApi.smartWebSocketV2 import SmartWebSocketV2 +''' +websocket +''' +AUTH_TOKEN = authToken +API_KEY = "" +CLIENT_CODE = "" +FEED_TOKEN = feedToken + +action = 1 +mode = 1 + +token_list = [ + { + "exchangeType": 5, + "tokens": [ + "244999", + "246083" + ] + }] + +# token_list = [{"exchangeType": 1, "tokens": ["26009"]}] + +sws = SmartWebSocketV2(AUTH_TOKEN, API_KEY, CLIENT_CODE, FEED_TOKEN) + +def on_data(wsapp, message): + logger.info("Ticks: {}".format(message)) + + +def on_open(wsapp): + logger.info("on open") + sws.subscribe(correlation_id, mode, token_list) + + +def on_error(wsapp, error): + logger.error(error) + + +def on_close(wsapp): + logger.info("Close") + + +# Assign the callbacks. +sws.on_open = on_open +sws.on_data = on_data +sws.on_error = on_error +sws.on_close = on_close + +sws.connect() + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..80d8d20a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +setuptools~=44.1.1 +six~=1.16.0 +python-dateutil~=2.8.2 +requests~=2.27.1 +websocket-client~=0.59.0 +logzero~=1.7.0 \ No newline at end of file