WebSocket Streaming
The PyArrow SDK provides real-time WebSocket streaming: the Order Stream (JSON updates), the Data Stream (token-based ticks), and the HFT Data Stream (symbol- or token-based binary protocol). All sockets share automatic reconnection, heartbeat/read timeouts, and thread-based event delivery.
Overview
ArrowStreams exposes three WebSocket connections:
| Stream | Endpoint | Purpose | Events |
|---|---|---|---|
| Order Stream | wss://order-updates.arrow.trade |
Order and position updates | on_order_update (JSON) |
| Data Stream | wss://ds.arrow.trade |
Standard market data (token-based, binary ticks) | on_ticks (MarketTick) |
| HFT Data Stream | wss://socket.arrow.trade |
Low-latency market data (symbol or token IDs, binary protocol) | on_ltp_tick, on_full_tick, on_response |
Quick Start
from pyarrow_client import ArrowStreams, DataMode
# Initialize streams
streams = ArrowStreams(
appID="your_app_id",
token="your_access_token",
debug=True # Enable debug logging
)
# Define event handlers
def on_order_update(order):
print(f"Order Update: {order['id']} - {order['orderStatus']}")
def on_tick(tick):
print(f"Token: {tick.token} | LTP: ₹{tick.ltp} | Change: {tick.net_change}%")
def on_connect():
print("✓ Connected to market data stream")
def on_disconnect():
print("✗ Disconnected from stream")
# Attach handlers
streams.order_stream.on_order_update = on_order_update
streams.data_stream.on_ticks = on_tick
streams.data_stream.on_connect = on_connect
streams.data_stream.on_disconnect = on_disconnect
# connect_all() connects order, data, and HFT streams
streams.connect_all()
# Subscribe to market data (Data Stream — integer tokens)
tokens = [3045, 1594] # RELIANCE, INFY
streams.subscribe_market_data(DataMode.QUOTE, tokens)
# Keep connection alive
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
streams.disconnect_all()
Order Stream
The Order Stream (OrderStream) uses wss://order-updates.arrow.trade?appID=<appID>&token=<token>. Incoming messages are JSON text. The SDK parses each message and calls on_order_update when the object contains an id field (order identifier).
streams.order_stream.on_order_update = lambda o: print(o["id"], o.get("orderStatus"))
streams.connect_order_stream()
Updates typically include fields such as orderStatus, symbol, quantity, price, transactionType, averagePrice, and rejectReason. Use the REST orders documentation for the full payload shape.
Data Stream
The Data Stream (DataStream) uses integer instrument tokens and a big-endian binary tick format (aligned with the JS client). Subscribe with DataMode and subscribe_market_data / unsubscribe_market_data.
LTP Mode (13 bytes)
Last traded price only.
streams.subscribe_market_data(DataMode.LTP, [3045, 1594])
def on_tick(tick):
print(f"Token: {tick.token} | LTP: {tick.ltp} | mode: {tick.mode}") # mode == 'ltp'
| Field | Description |
|---|---|
token |
Instrument token |
ltp |
Last traded price |
mode |
"ltp" |
LTPC Mode (17 bytes)
Minimal data for price tracking.
from pyarrow_client import DataMode
# Subscribe to LTPC mode
streams.subscribe_market_data(DataMode.LTPC, [3045, 1594])
def on_tick(tick):
print(f"Token: {tick.token}")
print(f"LTP: ₹{tick.ltp}")
print(f"Close: ₹{tick.close}")
print(f"Net Change: {tick.net_change}%")
print(f"Change Flag: {tick.change_flag}") # 43(+), 45(-), 32(no change)
| Field | Description |
|---|---|
token |
Instrument token |
ltp |
Last traded price |
close |
Previous close |
net_change |
% change |
change_flag |
Direction indicator |
Quote Mode (93 bytes)
Detailed quotes with OHLCV and OI data.
# Subscribe to Quote mode
streams.subscribe_market_data(DataMode.QUOTE, [3045])
def on_tick(tick):
print(f"Token: {tick.token}")
print(f"LTP: ₹{tick.ltp} | Volume: {tick.volume}")
print(f"Open: {tick.open} | High: {tick.high} | Low: {tick.low} | Close: {tick.close}")
print(f"OI: {tick.oi} | OI High: {tick.oi_day_high} | OI Low: {tick.oi_day_low}")
print(f"Buy Qty: {tick.total_buy_quantity} | Sell Qty: {tick.total_sell_quantity}")
print(f"LTQ: {tick.ltq} | Avg Price: {tick.avg_price}")
print(f"LTT: {tick.ltt} | Time: {tick.time}")
| Field | Description |
|---|---|
| All LTPC fields | + |
open, high, low |
OHLC prices |
volume |
Traded volume |
oi, oi_day_high, oi_day_low |
Open Interest |
ltq |
Last traded quantity |
avg_price |
Average traded price |
total_buy_quantity |
Total buy quantity |
total_sell_quantity |
Total sell quantity |
ltt |
Last trade time |
time |
Timestamp |
Full Mode (241 bytes)
Complete market depth with 5 levels of bids/asks.
# Subscribe to Full mode
streams.subscribe_market_data(DataMode.FULL, [3045])
def on_tick(tick):
print(f"Token: {tick.token} | LTP: ₹{tick.ltp}")
print(f"Upper Limit: ₹{tick.upper_limit} | Lower Limit: ₹{tick.lower_limit}")
# Market Depth - Bids
print("\n--- BID DEPTH ---")
for i, bid in enumerate(tick.bids):
print(f"Level {i+1}: {bid['quantity']:>8} @ ₹{bid['price']:>10.2f} ({bid['orders']} orders)")
# Market Depth - Asks
print("\n--- ASK DEPTH ---")
for i, ask in enumerate(tick.asks):
print(f"Level {i+1}: {ask['quantity']:>8} @ ₹{ask['price']:>10.2f} ({ask['orders']} orders)")
| Field | Description |
|---|---|
| All Quote fields | + |
upper_limit |
Upper circuit limit |
lower_limit |
Lower circuit limit |
bids |
List of 5 bid levels |
asks |
List of 5 ask levels |
Subscribe to market data
# Subscribe tokens to a specific mode
tokens = [3045, 1594, 5533] # RELIANCE, INFY, SBIN
streams.subscribe_market_data(DataMode.QUOTE, tokens)
# Subscribe with different modes
streams.subscribe_market_data(DataMode.LTPC, [3045]) # Light data
streams.subscribe_market_data(DataMode.FULL, [1594]) # Full depth
Unsubscribe from market data
# Unsubscribe specific tokens from a mode
streams.unsubscribe_market_data(DataMode.QUOTE, [3045])
# Unsubscribe all
streams.unsubscribe_market_data(DataMode.LTPC, [3045, 1594, 5533])
MarketTick object
The Data Stream delivers MarketTick instances (dataclass). Numeric fields are populated depending on mode; unused fields may be 0 or empty lists.
from dataclasses import dataclass, field
from typing import List
@dataclass
class MarketTick:
token: int
ltp: float = 0.0
mode: str = "" # "ltp" | "ltpc" | "quote" | "full"
open: float = 0.0
high: float = 0.0
low: float = 0.0
close: float = 0.0
volume: int = 0
net_change: float = 0.0
change_flag: int = 32 # 43 '+', 45 '-', 32 flat
ltq: int = 0
avg_price: float = 0.0
total_buy_quantity: int = 0
total_sell_quantity: int = 0
ltt: int = 0
time: int = 0
oi: int = 0
oi_day_high: int = 0
oi_day_low: int = 0
upper_limit: float = 0.0
lower_limit: float = 0.0
bids: List[dict] = field(default_factory=list) # keys: price, quantity, orders
asks: List[dict] = field(default_factory=list)
HFT ticks are not MarketTick; use the dict fields described in HFT tick and response shapes.
HFT Data Stream
The HFT Data Stream (HFTDataStream) targets wss://socket.arrow.trade?appID=<appID>&token=<token>. It uses a little-endian binary protocol on the wire. Outbound commands are JSON (code: sub / unsub).
Protocol summary
| Topic | Detail |
|---|---|
| Multi-byte integers | Little-endian |
| Prices | Paise (1 rupee = 100 paise) |
| Packet framing | First 2 bytes: signed int16 packet size; byte index 2: packet type |
Packet types
| Type | Value | Size (bytes) | Handler callback |
|---|---|---|---|
| LTP | 1 (PKT_TYPE_LTP) |
40 | on_ltp_tick |
| Full depth | 2 (PKT_TYPE_FULL) |
196 | on_full_tick |
| Server response | 99 (PKT_TYPE_RESPONSE) |
540 | on_response |
Exchange segments (HFTDataStream constants)
| Constant | Value | Segment |
|---|---|---|
EXCH_NSE_CM |
0 |
NSE cash |
EXCH_NSE_FO |
1 |
NSE F&O |
EXCH_BSE_CM |
2 |
BSE cash |
EXCH_BSE_FO |
3 |
BSE F&O |
Symbol string formats (when subscribing with names instead of token IDs)
| Segment | Example format |
|---|---|
| NSE CM | NSE.<UNDERLYING>-<SERIES> — e.g. NSE.SBIN-EQ |
| NSE FO options | Underlying + expiry + C or P + strike — e.g. NYKAA30DEC25C232.5 |
| NSE FO futures | Underlying + expiry + F — e.g. BANKNIFTY30DEC25F |
| BSE CM | BSE.<UNDERLYING> — e.g. BSE.SBIN |
| BSE FO options | Asset type + expiry + C or P + strike — e.g. SENSEX01JAN26C74900 |
| BSE FO futures | <ASSET_TYPE><DD><MON><YY>F — e.g. SENSEX01JAN26F |
Limits
- Up to 100 requests per second per connection
- Up to 4096 symbols per subscription
- Max request size 16 KB
Product note: LTP-style streaming is available for NSE FO and NSE CM; full depth for other segments as supported by the service.
Connect and subscribe
from pyarrow_client import ArrowStreams, HFTDataStream
streams = ArrowStreams(appID="your_app_id", token="your_token", debug=True)
streams.hft_data_stream.on_ltp_tick = lambda t: print("LTP:", t)
streams.hft_data_stream.on_full_tick = lambda t: print("Full:", t)
streams.hft_data_stream.on_response = lambda r: print("Response:", r)
streams.connect_hft_data_stream()
# Mode: 'ltpc' or 'l' (LTP), 'full' or 'f' (depth)
# latency: milliseconds between ticks (50–60000, default 1000)
streams.subscribe_hft_data(
"ltpc",
["NSE.SBIN-EQ", "BSE.RELIANCE"],
latency=100,
)
# Integer token IDs (default segment NSE_CM / 0 in the wire message)
streams.subscribe_hft_data("full", [5042, 4449], latency=200)
Explicit exchange segments (token IDs per segment):
streams.hft_data_stream.subscribe_by_segment(
"full",
{
HFTDataStream.EXCH_NSE_FO: [5042, 4449, 91],
HFTDataStream.EXCH_BSE_CM: [100, 200],
},
latency=100,
)
Unsubscribe
streams.unsubscribe_hft_data("ltpc", ["NSE.SBIN-EQ"])
streams.hft_data_stream.unsubscribe_by_segment(
"full",
{HFTDataStream.EXCH_NSE_FO: [5042]},
)
Mixed types (strings and ints in one list) are not supported for a single subscribe/unsubscribe call.
HFT tick and response shapes
Callbacks receive plain dicts (parsed binary).
LTP packet (on_ltp_tick) — fields include: size, pkt_type, exch_seg, token, ltp, vwap, volume, ltt, atv, btv (see SDK _parse_ltp_packet for offsets; prices in paise).
Full packet (on_full_tick) — includes OHLC, ltq, vwap, ltt, day range dpr_l / dpr_h, tbq, tsq, volume, five levels of bid_px, ask_px, bid_size, ask_size, bid_ord, ask_ord, oi, server ts, atv, btv.
Response packet (on_response) — subscribe/unsubscribe acknowledgement: error_code, error_msg, request_type / request_type_str, mode / mode_str, success_count, error_count. Typical error_code values include: SUCCESS, E_PARTIAL, E_ALL_INVALID, E_INVALID_JSON, E_MISSING_FIELD, E_INVALID_PARAM, E_PARSE_ERROR.
Resubscription
Like DataStream, HFTDataStream resubscribes from its local subscription map after on_open (modes ltpc and full).
Connection Management
Connect Streams
# Connect all three streams (order, data, HFT)
streams.connect_all()
# Or connect individually
streams.connect_order_stream()
streams.connect_data_stream()
streams.connect_hft_data_stream()
Disconnect Streams
# Disconnect from all streams
streams.disconnect_all()
# Check connection status
status = streams.get_status()
print(f"Order Stream: {status['order_stream']}")
print(f"Data Stream: {status['data_stream']}")
print(f"HFT Data Stream: {status['hft_data_stream']}")
Connection Status
status = streams.get_status()
# Returns (values are SocketStatus string values, e.g. 'connected', 'connecting', 'disconnected'):
# {
# 'order_stream': 'connected',
# 'data_stream': 'connected',
# 'hft_data_stream': 'connected',
# }
ConnectionConfig (shared)
ArrowStreams(appID=..., token=..., debug=...) builds a ConnectionConfig used by all streams. Notable fields (defaults match the JS client behaviour in the SDK): enable_reconnect, max_reconnect_attempts (e.g. 300), max_reconnect_delay (seconds), immediate_reconnect_attempts (first N reconnects with no delay), read_timeout, ping_interval. The socket layer sends periodic PONG text on a timer and closes the connection if incoming data stalls beyond read_timeout.
Event Handlers
Order Stream Events
# Order update received
def on_order_update(order):
"""Handle order status updates."""
print(f"Order: {order['id']}")
print(f"Status: {order['orderStatus']}")
print(f"Symbol: {order['symbol']}")
print(f"Qty: {order['quantity']} @ {order['price']}")
# Attach handler
streams.order_stream.on_order_update = on_order_update
Data Stream Events
# Market tick received
def on_ticks(tick):
"""Handle incoming market data."""
print(f"Tick: {tick.token} @ {tick.ltp}")
# Connection established
def on_connect():
"""Called when connection is established."""
print("Connected!")
# Re-subscribe after reconnection
streams.subscribe_market_data(DataMode.QUOTE, [3045])
# Connection lost
def on_disconnect():
"""Called when connection is lost."""
print("Disconnected!")
# Error occurred
def on_error(error):
"""Handle connection errors."""
print(f"Error: {error}")
# Connection closed
def on_close(close_status_code, close_msg):
"""Called when connection is closed."""
print(f"Closed: {close_status_code} - {close_msg}")
# Reconnection attempt
def on_reconnect(attempt, delay):
"""Called during reconnection attempts."""
print(f"Reconnecting... Attempt {attempt}, waiting {delay}s")
# Max reconnection attempts reached
def on_no_reconnect():
"""Called when max reconnection attempts reached."""
print("Max reconnection attempts reached!")
# Attach handlers
streams.data_stream.on_ticks = on_ticks
streams.data_stream.on_connect = on_connect
streams.data_stream.on_disconnect = on_disconnect
streams.data_stream.on_error = on_error
streams.data_stream.on_close = on_close
streams.data_stream.on_reconnect = on_reconnect
streams.data_stream.on_no_reconnect = on_no_reconnect
HFT Data Stream Events
def on_ltp_tick(tick):
"""Binary LTP packet decoded to dict (40-byte payload)."""
print(tick["token"], tick["ltp"])
def on_full_tick(tick):
"""Binary full-depth packet (196 bytes)."""
print(tick["token"], tick["bid_px"], tick["ask_px"])
def on_response(resp):
"""Server response to sub/unsub (540 bytes)."""
print(resp["error_code"], resp["success_count"], resp["error_count"])
streams.hft_data_stream.on_ltp_tick = on_ltp_tick
streams.hft_data_stream.on_full_tick = on_full_tick
streams.hft_data_stream.on_response = on_response
# BaseSocket handlers also apply: on_connect, on_disconnect, on_error, on_close,
# on_reconnect, on_no_reconnect
Example: Order Monitor
from pyarrow_client import ArrowClient, ArrowStreams
def order_monitor(app_id, token):
"""Monitor real-time order updates."""
streams = ArrowStreams(appID=app_id, token=token, debug=False)
def on_order_update(order):
status = order.get('orderStatus', 'UNKNOWN')
symbol = order.get('symbol', 'N/A')
order_id = order.get('id', 'N/A')
txn = "BUY" if order.get('transactionType') == 'B' else "SELL"
qty = order.get('quantity', 0)
price = order.get('price', 0)
# Status emoji
emoji = {
'PENDING': '⏳',
'OPEN': '📖',
'COMPLETE': '✅',
'CANCELLED': '❌',
'REJECTED': '🚫'
}.get(status, '❓')
print(f"\n{emoji} ORDER UPDATE")
print(f" ID: {order_id}")
print(f" {txn} {qty} x {symbol} @ ₹{price}")
print(f" Status: {status}")
if status == 'REJECTED':
print(f" Reason: {order.get('rejectReason', 'N/A')}")
if status == 'COMPLETE':
print(f" Avg Price: ₹{order.get('averagePrice', 'N/A')}")
streams.order_stream.on_order_update = on_order_update
print("Starting order monitor...")
streams.connect_order_stream()
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
streams.disconnect_all()
# Usage
order_monitor("your_app_id", "your_token")
Complete Example: Live Dashboard
from pyarrow_client import ArrowClient, ArrowStreams, DataMode
import time
from datetime import datetime
class LiveDashboard:
def __init__(self, app_id, token):
self.streams = ArrowStreams(appID=app_id, token=token, debug=False)
self.prices = {}
self.setup_handlers()
def setup_handlers(self):
self.streams.order_stream.on_order_update = self.on_order
self.streams.data_stream.on_ticks = self.on_tick
self.streams.data_stream.on_connect = self.on_connect
self.streams.data_stream.on_disconnect = self.on_disconnect
self.streams.data_stream.on_error = self.on_error
def on_tick(self, tick):
"""Process incoming market data."""
self.prices[tick.token] = {
'ltp': tick.ltp,
'change': tick.net_change,
'volume': tick.volume,
'time': datetime.now()
}
self.display()
def on_connect(self):
print("\n✓ Connected to Arrow Market Data Stream")
print("=" * 60)
def on_disconnect(self):
print("\n✗ Disconnected from stream")
def on_error(self, error):
print(f"\n⚠ Error: {error}")
def on_order(self, order):
print(f"\n📋 Order Update: {order['symbol']} - {order['orderStatus']}")
def display(self):
"""Display live prices."""
# Clear screen (optional)
# print("\033[2J\033[H") # Uncomment to clear terminal
print(f"\n{'Token':<10} {'LTP':>12} {'Change':>10} {'Volume':>15}")
print("-" * 50)
for token, data in self.prices.items():
change_str = f"{data['change']:+.2f}%"
color = "📈" if data['change'] > 0 else "📉" if data['change'] < 0 else "➡️"
print(f"{token:<10} ₹{data['ltp']:>10.2f} {change_str:>10} {data['volume']:>15,} {color}")
def start(self, tokens):
"""Start the dashboard."""
# connect_all() opens order, data, and HFT sockets; use connect_data_stream /
# connect_order_stream only if you do not need HFT.
self.streams.connect_all()
time.sleep(1) # Wait for connection
self.streams.subscribe_market_data(DataMode.QUOTE, tokens)
print(f"Subscribed to {len(tokens)} tokens")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")
self.streams.disconnect_all()
# Usage
if __name__ == "__main__":
# Initialize client and authenticate
client = ArrowClient(app_id="your_app_id")
client.auto_login(
user_id="your_user_id",
password="your_password",
api_secret="your_api_secret",
totp_secret="your_totp_secret"
)
# Start dashboard
dashboard = LiveDashboard(
app_id="your_app_id",
token=client.get_token()
)
# Subscribe to tokens
watchlist = [3045, 1594, 5533, 2885, 3499] # RELIANCE, INFY, SBIN, ICICIBANK, HDFCBANK
dashboard.start(watchlist)
Best Practices
Performance Tips
- Order Stream: use
connect_order_stream()alone when you only need fills and status updates - Data Stream: use
DataMode.LTPorDataMode.LTPCfor large watchlists when you only need price; useDataMode.QUOTEfor OHLCV; useDataMode.FULLonly when you need depth - HFT Data Stream: tune
latency(ms) to balance rate vs CPU; respect 100 req/s and 4096 symbols per subscription - Unsubscribe from instruments you no longer need on each stream you use
Connection Handling
- Always implement
on_connectto re-subscribe after reconnection - Handle
on_errorandon_disconnectfor graceful degradation - Use
on_reconnectto track reconnection attempts - Set
debug=Trueduring development for detailed logging
Thread Safety
- Event handlers are called from WebSocket threads
- Use thread-safe data structures when sharing state
- Avoid blocking operations in event handlers