Realtime Index Streaming¶
Index streaming is the fastest way to track broad-market movement and liquid symbols without polling snapshot APIs.
Use this when your strategy needs:
- index momentum signals
- live LTP and change tracking
- lightweight feed for reactive decisions
What You Receive¶
Each update can include:
- instrument or index symbol
- last traded value
- high and low values
- tick volume and cumulative volume
- open-interest field where applicable (
volume_oi)
WebSocket Index Stream Example¶
from nubra_python_sdk.ticker import websocketdata
from nubra_python_sdk.start_sdk import InitNubraSdk, NubraEnv
nubra = InitNubraSdk(NubraEnv.UAT, env_creds=True)
def pick(obj, *names):
for n in names:
v = getattr(obj, n, None)
if v is not None:
return v
return None
def clean(d):
return {k: v for k, v in d.items() if v is not None}
def as_rupees(v):
return round(v / 100, 2) if isinstance(v, (int, float)) else v
def on_index_data(msg):
ltp = pick(msg, "index_value", "ltp", "last_traded_price")
high = pick(msg, "high_index_value", "high")
low = pick(msg, "low_index_value", "low")
print(
"[Index]",
clean(
{
"symbol": pick(msg, "indexname", "index_name", "symbol", "display_name"),
"ltp": as_rupees(ltp),
"high": as_rupees(high),
"low": as_rupees(low),
"change_pct": pick(msg, "changepercent", "change_percent"),
"volume": pick(msg, "volume"),
"volume_oi": pick(msg, "volume_oi"),
}
),
)
def on_connect(msg):
print("[WebSocket] Connected:", msg)
def on_error(err):
print("[WebSocket] Error:", err)
socket = websocketdata.NubraDataSocket(
client=nubra,
on_index_data=on_index_data,
on_connect=on_connect,
on_error=on_error,
)
socket.connect()
socket.subscribe(["NIFTY", "BANKNIFTY", "RELIANCE"], data_type="index", exchange="NSE")
socket.keep_running()
Practical Notes¶
- use
exchange="NSE"or your required exchange in subscriptions - index values and instrument values can arrive in the same stream
- run the socket in a background thread if your app also has a UI or scheduler loop