Skip to content

Commit

Permalink
init commit for v3 websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
KetanGupta12 committed Nov 27, 2024
1 parent 0d68f01 commit dd596cd
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 1 deletion.
46 changes: 46 additions & 0 deletions test/sdk_tests/market_basic_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import time

import upstox_client
import data_token

configuration = upstox_client.Configuration()
configuration.access_token = data_token.access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration), instrumentKeys=["NSE_FO|53023", "MCX_FO|428750"], mode="full_d3")

streamer.auto_reconnect(True, 5, 10)


def on_open():
print("on open message")


def close(a, b):
print(f"on close message {a}")


def message(data):
print(f"on message message{data}")


def error(er):
print(f"on error message= {er}")


def reconnecting(data):
print(f"reconnecting event= {data}")


streamer.on("open", on_open)
streamer.on("message", message)
streamer.on("close", close)
streamer.on("reconnecting", reconnecting)
streamer.on("error", error)
streamer.connect()
time.sleep(10)
print("changing mode to full_d3")
streamer.change_mode(["MCX_FO|428750"], "full_d3")
time.sleep(10)
print("changing mode to ltpc")
streamer.change_mode(["MCX_FO|428750"], "ltpc")

1 change: 1 addition & 0 deletions upstox_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from upstox_client.api.websocket_api import WebsocketApi
# import websocket interfaces into sdk package
from upstox_client.feeder.market_data_streamer import MarketDataStreamer
from upstox_client.feeder.market_data_streamer_v3 import MarketDataStreamerV3
from upstox_client.feeder.portfolio_data_streamer import PortfolioDataStreamer
# import ApiClient
from upstox_client.api_client import ApiClient
Expand Down
1 change: 1 addition & 0 deletions upstox_client/feeder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

# import websocket interfaces into feeder package
from upstox_client.feeder.market_data_streamer import MarketDataStreamer
from upstox_client.feeder.market_data_streamer_v3 import MarketDataStreamerV3
from upstox_client.feeder.portfolio_data_streamer import PortfolioDataStreamer
92 changes: 92 additions & 0 deletions upstox_client/feeder/market_data_feeder_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import websocket
import json
import uuid
import threading
import ssl
from .feeder import Feeder


class MarketDataFeederV3(Feeder):
Mode = {
"LTPC": "ltpc",
"FULL": "full",
"OPTION": "option_greeks",
"D30": "full_d30"
}

Method = {
"SUBSCRIBE": "sub",
"CHANGE_METHOD": "change_mode",
"UNSUBSCRIBE": "unsub",
}

def __init__(self, api_client=None, instrumentKeys=[], mode="full", on_open=None, on_message=None, on_error=None, on_close=None):
super().__init__(api_client=api_client)
self.api_client = api_client
self.instrumentKeys = instrumentKeys
self.mode = mode
self.on_open = on_open
self.on_message = on_message
self.on_error = on_error
self.on_close = on_close
self.ws = None
self.closingCode = -1

def connect(self):
if self.ws and self.ws.sock:
return

sslopt = {
"cert_reqs": ssl.CERT_NONE,
"check_hostname": False,
}
ws_url = "wss://api.upstox.com/v3/feed/market-data-feed"
headers = {'Authorization': self.api_client.configuration.auth_settings().get("OAUTH2")[
"value"]}
self.ws = websocket.WebSocketApp(ws_url,
header=headers,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)

threading.Thread(target=self.ws.run_forever,
kwargs={"sslopt": sslopt}).start()

def subscribe(self, instrumentKeys, mode=None):
if self.ws and self.ws.sock:
request = self.build_request(
instrumentKeys, self.Method["SUBSCRIBE"], mode)
self.ws.send(request, opcode=websocket.ABNF.OPCODE_BINARY)
else:
raise Exception("WebSocket is not open.")

def unsubscribe(self, instrumentKeys):
if self.ws and self.ws.sock:
request = self.build_request(
instrumentKeys, self.Method["UNSUBSCRIBE"])
self.ws.send(request, opcode=websocket.ABNF.OPCODE_BINARY)
else:
raise Exception("WebSocket is not open.")

def change_mode(self, instrumentKeys, newMode):

if self.ws and self.ws.sock:
request = self.build_request(
instrumentKeys, self.Method["CHANGE_METHOD"], newMode)
self.ws.send(request, opcode=websocket.ABNF.OPCODE_BINARY)
else:
raise Exception("WebSocket is not open.")

def build_request(self, instrumentKeys, method, mode=None):
requestObj = {
"guid": str(uuid.uuid4()),
"method": method,
"data": {
"instrumentKeys": instrumentKeys,
},
}
if mode is not None:
requestObj["data"]["mode"] = mode

return json.dumps(requestObj).encode('utf-8')
97 changes: 97 additions & 0 deletions upstox_client/feeder/market_data_streamer_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from .market_data_feeder_v3 import MarketDataFeederV3
from .streamer import Streamer
from .proto import MarketDataFeedV3_pb2
from google.protobuf import json_format


class MarketDataStreamerV3(Streamer):
Mode = {
"LTPC": "ltpc",
"FULL": "full",
"OPTION": "option_greeks",
"D30": "full_d30"
}
def __init__(self, api_client=None, instrumentKeys=[], mode="ltpc"):
super().__init__(api_client)
self.protobufRoot = MarketDataFeedV3_pb2
self.api_client = api_client
self.instrumentKeys = instrumentKeys
self.mode = mode
self.subscriptions = {
self.Mode["LTPC"]: set(),
self.Mode["FULL"]: set(),
self.Mode["OPTION"]: set(),
self.Mode["D30"]: set(),
}
if mode not in self.Mode.values():
raise Exception(f"Invalid mode provided {mode}")
# Populate initial subscriptions if provided
for key in instrumentKeys:
self.subscriptions[mode].add(key)

def connect(self):
self.feeder = MarketDataFeederV3(
api_client=self.api_client, instrumentKeys=self.instrumentKeys, mode=self.mode, on_open=self.handle_open, on_message=self.handle_message, on_error=self.handle_error, on_close=self.handle_close)
self.feeder.connect()

def subscribe_to_initial_keys(self):
for mode, keys in self.subscriptions.items():
if keys:
self.feeder.subscribe(list(keys), mode)

def subscribe(self, instrumentKeys, mode):
if not self.feeder:
raise Exception("WebSocket is not open.")

if self.is_invalid_mode(mode):
return

self.feeder.subscribe(instrumentKeys, mode)
self.subscriptions[mode].update(instrumentKeys)

def unsubscribe(self, instrumentKeys):
self.feeder.unsubscribe(instrumentKeys)
for mode_keys in self.subscriptions.values():
mode_keys.difference_update(instrumentKeys)

def change_mode(self, instrumentKeys, newMode):
if not self.feeder:
raise Exception("WebSocket is not open.")

if self.is_invalid_mode(newMode):
return

oldMode = self.mode
self.feeder.change_mode(instrumentKeys, newMode)
# Remove keys from the old mode
self.subscriptions[oldMode].difference_update(instrumentKeys)
# Add keys to the new mode
self.subscriptions[newMode].update(instrumentKeys)

self.mode = newMode

def clear_subscriptions(self):
for mode_keys in self.subscriptions.values():
mode_keys.clear()

def decode_protobuf(self, buffer):
FeedResponse = self.protobufRoot.FeedResponse
return FeedResponse.FromString(buffer)

def handle_open(self, ws):
self.disconnect_valid = False
self.reconnect_in_progress = False
self.reconnect_attempts = 0
self.subscribe_to_initial_keys()
self.emit(self.Event["OPEN"])

def handle_message(self, ws, message):
decoded_data = self.decode_protobuf(message)
data_dict = json_format.MessageToDict(decoded_data)
self.emit(self.Event["MESSAGE"], data_dict)

def is_invalid_mode(self, mode):
if mode not in self.Mode.values():
self.emit(self.Event["ERROR"], f"Invalid mode provided {mode}")
return True
return False
119 changes: 119 additions & 0 deletions upstox_client/feeder/proto/MarketDataFeedV3.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
syntax = "proto3";
package com.upstox.marketdatafeederv3udapi.rpc.proto;

message LTPC {
double ltp = 1;
int64 ltt = 2;
double cp = 3; //close price
}

message MarketLevel {
repeated Quote bidAskQuote = 1;
}

message MarketOHLC {
repeated OHLC ohlc = 1;
}

message Quote {
int64 bidQ = 1;
double bidP = 2;
int64 askQ = 3;
double askP = 4;
}

message OptionGreeks {
double delta = 1;
double theta = 2;
double gamma = 3;
double vega = 4;
double rho = 5;
}

message OHLC {
string interval = 1;
double open = 2;
double high = 3;
double low = 4;
double close = 5;
int64 vol = 6;
int64 ts = 7;
}

enum Type{
initial_feed = 0;
live_feed = 1;
market_info = 2;
}

message MarketFullFeed{
LTPC ltpc = 1;
MarketLevel marketLevel = 2;
OptionGreeks optionGreeks = 3;
MarketOHLC marketOHLC = 4;
double atp = 5; //avg traded price
int64 vtt = 6; //volume traded today
double oi = 7; //open interest
double iv = 8; //implied volatility
double tbq =9; //total buy quantity
double tsq = 10; //total sell quantity
}

message IndexFullFeed{
LTPC ltpc = 1;
MarketOHLC marketOHLC = 2;
}


message FullFeed {
oneof FullFeedUnion {
MarketFullFeed marketFF = 1;
IndexFullFeed indexFF = 2;
}
}

message FirstLevelWithGreeks{
LTPC ltpc = 1;
Quote firstDepth = 2;
OptionGreeks optionGreeks = 3;
int64 vtt = 4; //volume traded today
double oi = 5; //open interest
double iv = 6; //implied volatility
}

message Feed {
oneof FeedUnion {
LTPC ltpc = 1;
FullFeed fullFeed = 2;
FirstLevelWithGreeks firstLevelWithGreeks = 3;
}
RequestMode requestMode = 4;
}

enum RequestMode {
ltpc = 0;
full_d5 = 1;
option_greeks = 2;
full_d30 = 3;
}

enum MarketStatus {
PRE_OPEN_START = 0;
PRE_OPEN_END = 1;
NORMAL_OPEN = 2;
NORMAL_CLOSE = 3;
CLOSING_START = 4;
CLOSING_END = 5;
}


message MarketInfo {
map<string, MarketStatus> segmentStatus = 1;
}

message FeedResponse{
Type type = 1;
map<string, Feed> feeds = 2;
int64 currentTs = 3;
MarketInfo marketInfo = 4;
}
Loading

0 comments on commit dd596cd

Please sign in to comment.