Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 51 additions & 90 deletions SmartApi/smartWebSocketV2.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
import logging
import ssl
import struct
import threading
import time
import ssl
import json

import websocket
from datetime import datetime, timedelta
from threading import Timer


class SmartWebSocketV2(object):
Expand Down Expand Up @@ -82,38 +81,33 @@ def _sanity_check(self):
# return False
# return True

def _on_message(self, wsapp, message):
print("message--->", message)
if message != "pong":
parsed_message = self._parse_binary_data(message)
self.on_message(wsapp, parsed_message)
else:
self.on_message(wsapp, message)

def _on_data(self, wsapp, data, data_type, continue_flag):

if data_type == 2:
parsed_message = self._parse_binary_data(data)
self.on_data(wsapp, parsed_message)
else:
self.on_data(wsapp, data)
# else:
# self.on_data(wsapp, data)

def _on_open(self, wsapp):
if self.RESUBSCRIBE_FLAG:
self.resubscribe()
self.RESUBSCRIBE_FLAG = False # Add this line to prevent resubscription on subsequent reconnects
else:
self.on_open(wsapp)
try:
self.resubscribe()
self.RESUBSCRIBE_FLAG = False # Add this line to prevent resubscription on subsequent reconnects
except Exception as e:
logging.exception("exception while resubscribing")
self.on_error(wsapp, e)
self.on_open(wsapp)

def _on_pong(self, wsapp, data):
if data == self.HEART_BEAT_MESSAGE:
timestamp = time.time()
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
print(f"In on pong function ==> {data}, Timestamp: {formatted_timestamp}")
self.last_pong_timestamp = timestamp
else:
# Handle the received feed data here
self.on_data(wsapp, data)
# else:
# Handle the received feed data here
# self.on_data(wsapp, data)

def _on_ping(self, wsapp, data):
timestamp = time.time()
Expand All @@ -123,28 +117,11 @@ def _on_ping(self, wsapp, data):

def check_connection_status(self):
current_time = time.time()
if self.last_pong_timestamp is not None and current_time - self.last_pong_timestamp > 2*self.HEART_BEAT_MESSAGE:
if self.last_pong_timestamp is not None and current_time - self.last_pong_timestamp > 2 * self.HEART_BEAT_MESSAGE:
# Stale connection detected, take appropriate action
self.close_connection()
self.connect()

def start_ping_timer(self):
def send_ping():
try:
current_time = datetime.now()
if self.last_pong_timestamp is None or self.last_pong_timestamp < current_time - timedelta(self.HEART_BEAT_MESSAGE):
# print("stale connection detected")
# self.wsapp.close()
self.connect()
else:
self.last_ping_timestamp = time.time()
except Exception as e:
self.wsapp.close()
self.resubscribe()

ping_timer = Timer(5, send_ping)
ping_timer.start()

def subscribe(self, correlation_id, mode, token_list):
"""
This Function subscribe the price data for the given token
Expand Down Expand Up @@ -186,8 +163,12 @@ def subscribe(self, correlation_id, mode, token_list):
"tokenList": token_list
}
}
if mode == 4:
for token in token_list:
if token.get('exchangeType') != 1:
raise ValueError("Invalid ExchangeType: Please check the exchange type and try again")

if self.input_request_dict.get(mode) is None:
if self.input_request_dict.get(mode, None) is None:
self.input_request_dict[mode] = {}

for token in token_list:
Expand Down Expand Up @@ -242,33 +223,19 @@ def unsubscribe(self, correlation_id, mode, token_list):
tokens: list of string
"""
try:
total_tokens = sum(len(token["tokens"]) for token in token_list)
quota_limit = 50
if total_tokens > quota_limit:
raise Exception("Quota exceeded: You can subscribe to a maximum of {} tokens.".format(quota_limit))
else:
request_data = {
"correlationID": correlation_id,
"action": self.SUBSCRIBE_ACTION,
"params": {
"mode": mode,
"tokenList": token_list
}
request_data = {
"correlationID": correlation_id,
"action": self.UNSUBSCRIBE_ACTION,
"params": {
"mode": mode,
"tokenList": token_list
}
}

if self.input_request_dict.get(mode, None) is None:
self.input_request_dict[mode] = {}

for token in token_list:
if token['exchangeType'] in self.input_request_dict[mode]:
self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"])
else:
self.input_request_dict[mode][token['exchangeType']] = token["tokens"]
self.wsapp.send(json.dumps(request_data))
self.RESUBSCRIBE_FLAG = True

self.input_request_dict.update(request_data)
self.wsapp.send(json.dumps(request_data))
self.RESUBSCRIBE_FLAG = True
except Exception as e:
print("Error:", e)
raise e

def resubscribe(self):
Expand Down Expand Up @@ -310,51 +277,42 @@ def connect(self):
on_pong=self._on_pong)
self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEART_BEAT_INTERVAL,
ping_payload=self.HEART_BEAT_MESSAGE)
# self.start_ping_timer()
except Exception as e:
raise e

def close_connection(self):
"""
Closes the connection
"""
self.RESUBSCRIBE_FLAG = False
# self.RESUBSCRIBE_FLAG = False
self.DISCONNECT_FLAG = True
# self.HB_THREAD_FLAG = False
if self.wsapp:
self.wsapp.close()

# def run(self):
# while True:
# if not self.HB_THREAD_FLAG:
# break
# self.send_heart_beat()
# time.sleep(self.HEAR_BEAT_INTERVAL)

def send_heart_beat(self):
try:
self.wsapp.send(self.HEART_BEAT_MESSAGE)
except Exception as e:
raise e

def _on_error(self, wsapp, error):
# self.HB_THREAD_FLAG = False
self.on_error(wsapp, error)
self.RESUBSCRIBE_FLAG = True
if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT:
print("Attempting to resubscribe/reconnect...")
self.current_retry_attempt += 1
sleep_seconds = self.current_retry_attempt * 10
logging.info("Attempting to resubscribe/reconnect... sleeping for %d ", sleep_seconds)
time.sleep(sleep_seconds)
logging.info("coming back from sleep")
try:
self.close_connection()
self.connect()
except Exception as e:
print("Error occurred during resubscribe/reconnect:", str(e))
logging.exception("Error occurred during resubscribe/reconnect")

else:
self.close_connection()

def _on_close(self, wsapp):
def _on_close(self, wsapp, close_status_code, close_msg):
# self.HB_THREAD_FLAG = False
# print(self.wsapp.close_frame)
self.on_close(wsapp)
self.on_close(wsapp,close_status_code,close_msg)

def _parse_binary_data(self, binary_data):
parsed_data = {
Expand Down Expand Up @@ -382,7 +340,8 @@ def _parse_binary_data(self, binary_data):
if parsed_data["subscription_mode"] == self.SNAP_QUOTE:
parsed_data["last_traded_timestamp"] = self._unpack_data(binary_data, 123, 131, byte_format="q")[0]
parsed_data["open_interest"] = self._unpack_data(binary_data, 131, 139, byte_format="q")[0]
parsed_data["open_interest_change_percentage"] = self._unpack_data(binary_data, 139, 147, byte_format="q")[0]
parsed_data["open_interest_change_percentage"] = \
self._unpack_data(binary_data, 139, 147, byte_format="q")[0]
parsed_data["upper_circuit_limit"] = self._unpack_data(binary_data, 347, 355, byte_format="q")[0]
parsed_data["lower_circuit_limit"] = self._unpack_data(binary_data, 355, 363, byte_format="q")[0]
parsed_data["52_week_high_price"] = self._unpack_data(binary_data, 363, 371, byte_format="q")[0]
Expand All @@ -395,7 +354,7 @@ def _parse_binary_data(self, binary_data):
parsed_data.pop("sequence_number", None)
parsed_data.pop("last_traded_price", None)
parsed_data.pop("subscription_mode_val", None)
parsed_data["packet_received_time"]=self._unpack_data(binary_data, 35, 43, byte_format="q")[0]
parsed_data["packet_received_time"] = self._unpack_data(binary_data, 35, 43, byte_format="q")[0]
depth_data_start_index = 43
depth_20_data = self._parse_depth_20_buy_and_sell_data(binary_data[depth_data_start_index:])
parsed_data["depth_20_buy_data"] = depth_20_data["depth_20_buy_data"]
Expand Down Expand Up @@ -467,14 +426,16 @@ def _parse_depth_20_buy_and_sell_data(self, binary_data):
buy_packet_data = {
"quantity": self._unpack_data(binary_data, buy_start_idx, buy_start_idx + 4, byte_format="i")[0],
"price": self._unpack_data(binary_data, buy_start_idx + 4, buy_start_idx + 8, byte_format="i")[0],
"num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[0],
"num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[
0],
}

# Parse sell data
sell_packet_data = {
"quantity": self._unpack_data(binary_data, sell_start_idx, sell_start_idx + 4, byte_format="i")[0],
"price": self._unpack_data(binary_data, sell_start_idx + 4, sell_start_idx + 8, byte_format="i")[0],
"num_of_orders": self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0],
"num_of_orders":
self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0],
}

depth_20_buy_data.append(buy_packet_data)
Expand All @@ -491,11 +452,11 @@ def _parse_depth_20_buy_and_sell_data(self, binary_data):
def on_data(self, wsapp, data):
pass

def on_close(self, wsapp):
def on_close(self, wsapp,close_status_code,close_msg):
pass

def on_open(self, wsapp):
pass

def on_error(self):
def on_error(self, wsapp, error):
pass