WebSocket Streaming
The PyArrow SDK provides real-time WebSocket streaming for live market data and order updates with automatic reconnection, thread-safe event handling, and multiple subscription modes.
Overview
ArrowStreams provides two WebSocket connections:
| Stream | Purpose | Events |
|---|---|---|
| Data Stream | Real-time market data | Ticks, quotes, market depth |
| Order Stream | Order & position updates | Order status, fills, positions |
Quick Start
from pyarrow import ArrowStreams, DataMode
# Initialize streams
streams = ArrowStreams(
appID="your_app_id",
token="your_access_token",
debug=True # Enable debug logging
)
# Define event handlers
def on_tick(tick):
print(f"Token: {tick.token} | LTP: ₹{tick.ltp} | Change: {tick.net_change}%")
def on_order_update(order):
print(f"Order Update: {order['id']} - {order['orderStatus']}")
def on_connect():
print("✓ Connected to market data stream")
def on_disconnect():
print("✗ Disconnected from stream")
# Attach handlers
streams.data_stream.on_ticks = on_tick
streams.data_stream.on_connect = on_connect
streams.data_stream.on_disconnect = on_disconnect
streams.order_stream.on_order_update = on_order_update
# Connect to streams
streams.connect_all()
# Subscribe to market data
tokens = [3045, 1594] # RELIANCE, INFY
streams.subscribe_market_data(DataMode.QUOTE, tokens)
# Keep connection alive
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
streams.disconnect_all()
Data Modes
PyArrow supports three data subscription modes with varying levels of detail and bandwidth:
LTPC Mode (17 bytes)
Minimal data for price tracking.
from pyarrow import DataMode
# Subscribe to LTPC mode
streams.subscribe_market_data(DataMode.LTPC, [3045, 1594])
def on_tick(tick):
print(f"Token: {tick.token}")
print(f"LTP: ₹{tick.ltp}")
print(f"Close: ₹{tick.close}")
print(f"Net Change: {tick.net_change}%")
print(f"Change Flag: {tick.change_flag}") # 43(+), 45(-), 32(no change)
| Field | Description |
|---|---|
token |
Instrument token |
ltp |
Last traded price |
close |
Previous close |
net_change |
% change |
change_flag |
Direction indicator |
Quote Mode (93 bytes)
Detailed quotes with OHLCV and OI data.
# Subscribe to Quote mode
streams.subscribe_market_data(DataMode.QUOTE, [3045])
def on_tick(tick):
print(f"Token: {tick.token}")
print(f"LTP: ₹{tick.ltp} | Volume: {tick.volume}")
print(f"Open: {tick.open} | High: {tick.high} | Low: {tick.low} | Close: {tick.close}")
print(f"OI: {tick.oi} | OI High: {tick.oi_day_high} | OI Low: {tick.oi_day_low}")
print(f"Buy Qty: {tick.total_buy_quantity} | Sell Qty: {tick.total_sell_quantity}")
print(f"LTQ: {tick.ltq} | Avg Price: {tick.avg_price}")
print(f"LTT: {tick.ltt} | Time: {tick.time}")
| Field | Description |
|---|---|
| All LTPC fields | + |
open, high, low |
OHLC prices |
volume |
Traded volume |
oi, oi_day_high, oi_day_low |
Open Interest |
ltq |
Last traded quantity |
avg_price |
Average traded price |
total_buy_quantity |
Total buy quantity |
total_sell_quantity |
Total sell quantity |
ltt |
Last trade time |
time |
Timestamp |
Full Mode (241 bytes)
Complete market depth with 5 levels of bids/asks.
# Subscribe to Full mode
streams.subscribe_market_data(DataMode.FULL, [3045])
def on_tick(tick):
print(f"Token: {tick.token} | LTP: ₹{tick.ltp}")
print(f"Upper Limit: ₹{tick.upper_limit} | Lower Limit: ₹{tick.lower_limit}")
# Market Depth - Bids
print("\n--- BID DEPTH ---")
for i, bid in enumerate(tick.bids):
print(f"Level {i+1}: {bid['quantity']:>8} @ ₹{bid['price']:>10.2f} ({bid['orders']} orders)")
# Market Depth - Asks
print("\n--- ASK DEPTH ---")
for i, ask in enumerate(tick.asks):
print(f"Level {i+1}: {ask['quantity']:>8} @ ₹{ask['price']:>10.2f} ({ask['orders']} orders)")
| Field | Description |
|---|---|
| All Quote fields | + |
upper_limit |
Upper circuit limit |
lower_limit |
Lower circuit limit |
bids |
List of 5 bid levels |
asks |
List of 5 ask levels |
Connection Management
Connect Streams
# Connect to both streams
streams.connect_all()
# Or connect individually
streams.connect_data_stream()
streams.connect_order_stream()
Disconnect Streams
# Disconnect from all streams
streams.disconnect_all()
# Check connection status
status = streams.get_status()
print(f"Data Stream: {status['data_stream']}")
print(f"Order Stream: {status['order_stream']}")
Connection Status
# Get detailed status
status = streams.get_status()
# Returns:
# {
# 'data_stream': 'connected', # or 'disconnected'
# 'order_stream': 'connected' # or 'disconnected'
# }
Subscription Management
Subscribe to Market Data
# Subscribe tokens to a specific mode
tokens = [3045, 1594, 5533] # RELIANCE, INFY, SBIN
streams.subscribe_market_data(DataMode.QUOTE, tokens)
# Subscribe with different modes
streams.subscribe_market_data(DataMode.LTPC, [3045]) # Light data
streams.subscribe_market_data(DataMode.FULL, [1594]) # Full depth
Unsubscribe from Market Data
# Unsubscribe specific tokens from a mode
streams.unsubscribe_market_data(DataMode.QUOTE, [3045])
# Unsubscribe all
streams.unsubscribe_market_data(DataMode.LTPC, [3045, 1594, 5533])
Event Handlers
Data Stream Events
# Market tick received
def on_ticks(tick):
"""Handle incoming market data."""
print(f"Tick: {tick.token} @ {tick.ltp}")
# Connection established
def on_connect():
"""Called when connection is established."""
print("Connected!")
# Re-subscribe after reconnection
streams.subscribe_market_data(DataMode.QUOTE, [3045])
# Connection lost
def on_disconnect():
"""Called when connection is lost."""
print("Disconnected!")
# Error occurred
def on_error(error):
"""Handle connection errors."""
print(f"Error: {error}")
# Connection closed
def on_close(close_status_code, close_msg):
"""Called when connection is closed."""
print(f"Closed: {close_status_code} - {close_msg}")
# Reconnection attempt
def on_reconnect(attempt, delay):
"""Called during reconnection attempts."""
print(f"Reconnecting... Attempt {attempt}, waiting {delay}s")
# Max reconnection attempts reached
def on_no_reconnect():
"""Called when max reconnection attempts reached."""
print("Max reconnection attempts reached!")
# Attach handlers
streams.data_stream.on_ticks = on_ticks
streams.data_stream.on_connect = on_connect
streams.data_stream.on_disconnect = on_disconnect
streams.data_stream.on_error = on_error
streams.data_stream.on_close = on_close
streams.data_stream.on_reconnect = on_reconnect
streams.data_stream.on_no_reconnect = on_no_reconnect
Order Stream Events
# Order update received
def on_order_update(order):
"""Handle order status updates."""
print(f"Order: {order['id']}")
print(f"Status: {order['orderStatus']}")
print(f"Symbol: {order['symbol']}")
print(f"Qty: {order['quantity']} @ {order['price']}")
# Attach handler
streams.order_stream.on_order_update = on_order_update
MarketTick Object
The MarketTick object contains all received market data:
class MarketTick:
# Identity
token: int # Instrument token
mode: str # Data mode (ltpc/quote/full)
# Price data
ltp: float # Last traded price
open: float # Open price
high: float # High price
low: float # Low price
close: float # Close/Previous close
volume: int # Traded volume
# Change
net_change: float # Percentage change
change_flag: int # 43(+), 45(-), 32(no change)
# Quote data
ltq: int # Last traded quantity
avg_price: float # Average traded price
total_buy_quantity: int
total_sell_quantity: int
# Time
ltt: datetime # Last trade time
time: datetime # Timestamp
# Open Interest
oi: int # Open interest
oi_day_high: int # OI day high
oi_day_low: int # OI day low
# Limits (Full mode)
upper_limit: float # Upper circuit
lower_limit: float # Lower circuit
# Depth (Full mode)
bids: List[dict] # 5 bid levels
asks: List[dict] # 5 ask levels
Complete Example: Live Dashboard
from pyarrow import ArrowClient, ArrowStreams, DataMode
import time
from datetime import datetime
class LiveDashboard:
def __init__(self, app_id, token):
self.streams = ArrowStreams(appID=app_id, token=token, debug=False)
self.prices = {}
self.setup_handlers()
def setup_handlers(self):
self.streams.data_stream.on_ticks = self.on_tick
self.streams.data_stream.on_connect = self.on_connect
self.streams.data_stream.on_disconnect = self.on_disconnect
self.streams.data_stream.on_error = self.on_error
self.streams.order_stream.on_order_update = self.on_order
def on_tick(self, tick):
"""Process incoming market data."""
self.prices[tick.token] = {
'ltp': tick.ltp,
'change': tick.net_change,
'volume': tick.volume,
'time': datetime.now()
}
self.display()
def on_connect(self):
print("\n✓ Connected to Arrow Market Data Stream")
print("=" * 60)
def on_disconnect(self):
print("\n✗ Disconnected from stream")
def on_error(self, error):
print(f"\n⚠ Error: {error}")
def on_order(self, order):
print(f"\n📋 Order Update: {order['symbol']} - {order['orderStatus']}")
def display(self):
"""Display live prices."""
# Clear screen (optional)
# print("\033[2J\033[H") # Uncomment to clear terminal
print(f"\n{'Token':<10} {'LTP':>12} {'Change':>10} {'Volume':>15}")
print("-" * 50)
for token, data in self.prices.items():
change_str = f"{data['change']:+.2f}%"
color = "📈" if data['change'] > 0 else "📉" if data['change'] < 0 else "➡️"
print(f"{token:<10} ₹{data['ltp']:>10.2f} {change_str:>10} {data['volume']:>15,} {color}")
def start(self, tokens):
"""Start the dashboard."""
self.streams.connect_all()
time.sleep(1) # Wait for connection
self.streams.subscribe_market_data(DataMode.QUOTE, tokens)
print(f"Subscribed to {len(tokens)} tokens")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")
self.streams.disconnect_all()
# Usage
if __name__ == "__main__":
# Initialize client and authenticate
client = ArrowClient(app_id="your_app_id")
client.auto_login(
user_id="your_user_id",
password="your_password",
api_secret="your_api_secret",
totp_secret="your_totp_secret"
)
# Start dashboard
dashboard = LiveDashboard(
app_id="your_app_id",
token=client.get_token()
)
# Subscribe to tokens
watchlist = [3045, 1594, 5533, 2885, 3499] # RELIANCE, INFY, SBIN, ICICIBANK, HDFCBANK
dashboard.start(watchlist)
Example: Order Monitor
from pyarrow import ArrowClient, ArrowStreams
def order_monitor(app_id, token):
"""Monitor real-time order updates."""
streams = ArrowStreams(appID=app_id, token=token, debug=False)
def on_order_update(order):
status = order.get('orderStatus', 'UNKNOWN')
symbol = order.get('symbol', 'N/A')
order_id = order.get('id', 'N/A')
txn = "BUY" if order.get('transactionType') == 'B' else "SELL"
qty = order.get('quantity', 0)
price = order.get('price', 0)
# Status emoji
emoji = {
'PENDING': '⏳',
'OPEN': '📖',
'COMPLETE': '✅',
'CANCELLED': '❌',
'REJECTED': '🚫'
}.get(status, '❓')
print(f"\n{emoji} ORDER UPDATE")
print(f" ID: {order_id}")
print(f" {txn} {qty} x {symbol} @ ₹{price}")
print(f" Status: {status}")
if status == 'REJECTED':
print(f" Reason: {order.get('rejectReason', 'N/A')}")
if status == 'COMPLETE':
print(f" Avg Price: ₹{order.get('averagePrice', 'N/A')}")
streams.order_stream.on_order_update = on_order_update
print("Starting order monitor...")
streams.connect_order_stream()
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
streams.disconnect_all()
# Usage
order_monitor("your_app_id", "your_token")
Best Practices
Performance Tips
- Use
DataMode.LTPCfor large watchlists where you only need price - Use
DataMode.QUOTEfor normal trading with OHLCV - Use
DataMode.FULLonly when you need market depth - Unsubscribe from tokens you no longer need
Connection Handling
- Always implement
on_connectto re-subscribe after reconnection - Handle
on_errorandon_disconnectfor graceful degradation - Use
on_reconnectto track reconnection attempts - Set
debug=Trueduring development for detailed logging
Thread Safety
- Event handlers are called from WebSocket threads
- Use thread-safe data structures when sharing state
- Avoid blocking operations in event handlers