Skip to content

Realtime OHLCV Streaming

OHLCV streaming is useful for candle-driven systems that should not process every market tick.

Use this for:

  • indicator pipelines
  • timeframe-based entries/exits
  • reduced bandwidth compared to tick-level feeds

Subscription Pattern

OHLCV uses:

  • symbol list (for example NIFTY)
  • data_type="ohlcv"
  • interval string (for example 1m, 5m)
  • exchange value

WebSocket OHLCV Example

from nubra_python_sdk.ticker import websocketdata
from nubra_python_sdk.start_sdk import InitNubraSdk, NubraEnv

nubra = InitNubraSdk(NubraEnv.UAT, env_creds=True)

def pick(obj, *names):
    for name in names:
        value = getattr(obj, name, None)
        if value is not None:
            return value
    return None

def clean(d):
    return {k: v for k, v in d.items() if v is not None}

def as_rupees(v):
    return round(v / 100, 2) if isinstance(v, (int, float)) else v

def on_ohlcv_data(msg):
    print(
        "[OHLCV]",
        clean(
            {
                "symbol": pick(msg, "indexname", "index_name", "symbol", "display_name"),
                "interval": pick(msg, "interval"),
                "bucket_ts": pick(msg, "bucket_timestamp"),
                "open": as_rupees(pick(msg, "open")),
                "high": as_rupees(pick(msg, "high")),
                "low": as_rupees(pick(msg, "low")),
                "close": as_rupees(pick(msg, "close")),
                "bucket_volume": pick(msg, "bucket_volume"),
            }
        ),
    )

socket = websocketdata.NubraDataSocket(
    client=nubra,
    on_ohlcv_data=on_ohlcv_data,
)

socket.connect()
socket.subscribe(["NIFTY", "BANKNIFTY"], data_type="ohlcv", interval="5m", exchange="NSE")
socket.keep_running()

Practical Notes

  • choose intervals based on strategy latency needs
  • minute intervals reduce processing overhead
  • keep timezone/session handling consistent in your candle pipeline
NEO Assistant