Skip to content

WebSocket Streaming

The PyArrow SDK provides real-time WebSocket streaming for live market data and order updates with automatic reconnection, thread-safe event handling, and multiple subscription modes.

Overview

ArrowStreams provides two WebSocket connections:

Stream Purpose Events
Data Stream Real-time market data Ticks, quotes, market depth
Order Stream Order & position updates Order status, fills, positions

Quick Start

from pyarrow import ArrowStreams, DataMode

# Initialize streams
streams = ArrowStreams(
    appID="your_app_id",
    token="your_access_token",
    debug=True  # Enable debug logging
)

# Define event handlers
def on_tick(tick):
    print(f"Token: {tick.token} | LTP: ₹{tick.ltp} | Change: {tick.net_change}%")

def on_order_update(order):
    print(f"Order Update: {order['id']} - {order['orderStatus']}")

def on_connect():
    print("✓ Connected to market data stream")

def on_disconnect():
    print("✗ Disconnected from stream")

# Attach handlers
streams.data_stream.on_ticks = on_tick
streams.data_stream.on_connect = on_connect
streams.data_stream.on_disconnect = on_disconnect
streams.order_stream.on_order_update = on_order_update

# Connect to streams
streams.connect_all()

# Subscribe to market data
tokens = [3045, 1594]  # RELIANCE, INFY
streams.subscribe_market_data(DataMode.QUOTE, tokens)

# Keep connection alive
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    streams.disconnect_all()

Data Modes

PyArrow supports three data subscription modes with varying levels of detail and bandwidth:

LTPC Mode (17 bytes)

Minimal data for price tracking.

from pyarrow import DataMode

# Subscribe to LTPC mode
streams.subscribe_market_data(DataMode.LTPC, [3045, 1594])

def on_tick(tick):
    print(f"Token: {tick.token}")
    print(f"LTP: ₹{tick.ltp}")
    print(f"Close: ₹{tick.close}")
    print(f"Net Change: {tick.net_change}%")
    print(f"Change Flag: {tick.change_flag}")  # 43(+), 45(-), 32(no change)
Field Description
token Instrument token
ltp Last traded price
close Previous close
net_change % change
change_flag Direction indicator

Quote Mode (93 bytes)

Detailed quotes with OHLCV and OI data.

# Subscribe to Quote mode
streams.subscribe_market_data(DataMode.QUOTE, [3045])

def on_tick(tick):
    print(f"Token: {tick.token}")
    print(f"LTP: ₹{tick.ltp} | Volume: {tick.volume}")
    print(f"Open: {tick.open} | High: {tick.high} | Low: {tick.low} | Close: {tick.close}")
    print(f"OI: {tick.oi} | OI High: {tick.oi_day_high} | OI Low: {tick.oi_day_low}")
    print(f"Buy Qty: {tick.total_buy_quantity} | Sell Qty: {tick.total_sell_quantity}")
    print(f"LTQ: {tick.ltq} | Avg Price: {tick.avg_price}")
    print(f"LTT: {tick.ltt} | Time: {tick.time}")
Field Description
All LTPC fields +
open, high, low OHLC prices
volume Traded volume
oi, oi_day_high, oi_day_low Open Interest
ltq Last traded quantity
avg_price Average traded price
total_buy_quantity Total buy quantity
total_sell_quantity Total sell quantity
ltt Last trade time
time Timestamp

Full Mode (241 bytes)

Complete market depth with 5 levels of bids/asks.

# Subscribe to Full mode
streams.subscribe_market_data(DataMode.FULL, [3045])

def on_tick(tick):
    print(f"Token: {tick.token} | LTP: ₹{tick.ltp}")
    print(f"Upper Limit: ₹{tick.upper_limit} | Lower Limit: ₹{tick.lower_limit}")

    # Market Depth - Bids
    print("\n--- BID DEPTH ---")
    for i, bid in enumerate(tick.bids):
        print(f"Level {i+1}: {bid['quantity']:>8} @ ₹{bid['price']:>10.2f} ({bid['orders']} orders)")

    # Market Depth - Asks
    print("\n--- ASK DEPTH ---")
    for i, ask in enumerate(tick.asks):
        print(f"Level {i+1}: {ask['quantity']:>8} @ ₹{ask['price']:>10.2f} ({ask['orders']} orders)")
Field Description
All Quote fields +
upper_limit Upper circuit limit
lower_limit Lower circuit limit
bids List of 5 bid levels
asks List of 5 ask levels

Connection Management

Connect Streams

# Connect to both streams
streams.connect_all()

# Or connect individually
streams.connect_data_stream()
streams.connect_order_stream()

Disconnect Streams

# Disconnect from all streams
streams.disconnect_all()

# Check connection status
status = streams.get_status()
print(f"Data Stream: {status['data_stream']}")
print(f"Order Stream: {status['order_stream']}")

Connection Status

# Get detailed status
status = streams.get_status()

# Returns:
# {
#     'data_stream': 'connected',    # or 'disconnected'
#     'order_stream': 'connected'    # or 'disconnected'
# }

Subscription Management

Subscribe to Market Data

# Subscribe tokens to a specific mode
tokens = [3045, 1594, 5533]  # RELIANCE, INFY, SBIN
streams.subscribe_market_data(DataMode.QUOTE, tokens)

# Subscribe with different modes
streams.subscribe_market_data(DataMode.LTPC, [3045])   # Light data
streams.subscribe_market_data(DataMode.FULL, [1594])   # Full depth

Unsubscribe from Market Data

# Unsubscribe specific tokens from a mode
streams.unsubscribe_market_data(DataMode.QUOTE, [3045])

# Unsubscribe all
streams.unsubscribe_market_data(DataMode.LTPC, [3045, 1594, 5533])

Event Handlers

Data Stream Events

# Market tick received
def on_ticks(tick):
    """Handle incoming market data."""
    print(f"Tick: {tick.token} @ {tick.ltp}")

# Connection established
def on_connect():
    """Called when connection is established."""
    print("Connected!")
    # Re-subscribe after reconnection
    streams.subscribe_market_data(DataMode.QUOTE, [3045])

# Connection lost
def on_disconnect():
    """Called when connection is lost."""
    print("Disconnected!")

# Error occurred
def on_error(error):
    """Handle connection errors."""
    print(f"Error: {error}")

# Connection closed
def on_close(close_status_code, close_msg):
    """Called when connection is closed."""
    print(f"Closed: {close_status_code} - {close_msg}")

# Reconnection attempt
def on_reconnect(attempt, delay):
    """Called during reconnection attempts."""
    print(f"Reconnecting... Attempt {attempt}, waiting {delay}s")

# Max reconnection attempts reached
def on_no_reconnect():
    """Called when max reconnection attempts reached."""
    print("Max reconnection attempts reached!")

# Attach handlers
streams.data_stream.on_ticks = on_ticks
streams.data_stream.on_connect = on_connect
streams.data_stream.on_disconnect = on_disconnect
streams.data_stream.on_error = on_error
streams.data_stream.on_close = on_close
streams.data_stream.on_reconnect = on_reconnect
streams.data_stream.on_no_reconnect = on_no_reconnect

Order Stream Events

# Order update received
def on_order_update(order):
    """Handle order status updates."""
    print(f"Order: {order['id']}")
    print(f"Status: {order['orderStatus']}")
    print(f"Symbol: {order['symbol']}")
    print(f"Qty: {order['quantity']} @ {order['price']}")

# Attach handler
streams.order_stream.on_order_update = on_order_update

MarketTick Object

The MarketTick object contains all received market data:

class MarketTick:
    # Identity
    token: int           # Instrument token
    mode: str            # Data mode (ltpc/quote/full)

    # Price data
    ltp: float           # Last traded price
    open: float          # Open price
    high: float          # High price
    low: float           # Low price
    close: float         # Close/Previous close
    volume: int          # Traded volume

    # Change
    net_change: float    # Percentage change
    change_flag: int     # 43(+), 45(-), 32(no change)

    # Quote data
    ltq: int             # Last traded quantity
    avg_price: float     # Average traded price
    total_buy_quantity: int
    total_sell_quantity: int

    # Time
    ltt: datetime        # Last trade time
    time: datetime       # Timestamp

    # Open Interest
    oi: int              # Open interest
    oi_day_high: int     # OI day high
    oi_day_low: int      # OI day low

    # Limits (Full mode)
    upper_limit: float   # Upper circuit
    lower_limit: float   # Lower circuit

    # Depth (Full mode)
    bids: List[dict]     # 5 bid levels
    asks: List[dict]     # 5 ask levels

Complete Example: Live Dashboard

from pyarrow import ArrowClient, ArrowStreams, DataMode
import time
from datetime import datetime

class LiveDashboard:
    def __init__(self, app_id, token):
        self.streams = ArrowStreams(appID=app_id, token=token, debug=False)
        self.prices = {}
        self.setup_handlers()

    def setup_handlers(self):
        self.streams.data_stream.on_ticks = self.on_tick
        self.streams.data_stream.on_connect = self.on_connect
        self.streams.data_stream.on_disconnect = self.on_disconnect
        self.streams.data_stream.on_error = self.on_error
        self.streams.order_stream.on_order_update = self.on_order

    def on_tick(self, tick):
        """Process incoming market data."""
        self.prices[tick.token] = {
            'ltp': tick.ltp,
            'change': tick.net_change,
            'volume': tick.volume,
            'time': datetime.now()
        }
        self.display()

    def on_connect(self):
        print("\n✓ Connected to Arrow Market Data Stream")
        print("=" * 60)

    def on_disconnect(self):
        print("\n✗ Disconnected from stream")

    def on_error(self, error):
        print(f"\n⚠ Error: {error}")

    def on_order(self, order):
        print(f"\n📋 Order Update: {order['symbol']} - {order['orderStatus']}")

    def display(self):
        """Display live prices."""
        # Clear screen (optional)
        # print("\033[2J\033[H")  # Uncomment to clear terminal

        print(f"\n{'Token':<10} {'LTP':>12} {'Change':>10} {'Volume':>15}")
        print("-" * 50)

        for token, data in self.prices.items():
            change_str = f"{data['change']:+.2f}%"
            color = "📈" if data['change'] > 0 else "📉" if data['change'] < 0 else "➡️"
            print(f"{token:<10}{data['ltp']:>10.2f} {change_str:>10} {data['volume']:>15,} {color}")

    def start(self, tokens):
        """Start the dashboard."""
        self.streams.connect_all()
        time.sleep(1)  # Wait for connection

        self.streams.subscribe_market_data(DataMode.QUOTE, tokens)
        print(f"Subscribed to {len(tokens)} tokens")

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n\nShutting down...")
            self.streams.disconnect_all()

# Usage
if __name__ == "__main__":
    # Initialize client and authenticate
    client = ArrowClient(app_id="your_app_id")
    client.auto_login(
        user_id="your_user_id",
        password="your_password",
        api_secret="your_api_secret",
        totp_secret="your_totp_secret"
    )

    # Start dashboard
    dashboard = LiveDashboard(
        app_id="your_app_id",
        token=client.get_token()
    )

    # Subscribe to tokens
    watchlist = [3045, 1594, 5533, 2885, 3499]  # RELIANCE, INFY, SBIN, ICICIBANK, HDFCBANK
    dashboard.start(watchlist)

Example: Order Monitor

from pyarrow import ArrowClient, ArrowStreams

def order_monitor(app_id, token):
    """Monitor real-time order updates."""

    streams = ArrowStreams(appID=app_id, token=token, debug=False)

    def on_order_update(order):
        status = order.get('orderStatus', 'UNKNOWN')
        symbol = order.get('symbol', 'N/A')
        order_id = order.get('id', 'N/A')
        txn = "BUY" if order.get('transactionType') == 'B' else "SELL"
        qty = order.get('quantity', 0)
        price = order.get('price', 0)

        # Status emoji
        emoji = {
            'PENDING': '⏳',
            'OPEN': '📖',
            'COMPLETE': '✅',
            'CANCELLED': '❌',
            'REJECTED': '🚫'
        }.get(status, '❓')

        print(f"\n{emoji} ORDER UPDATE")
        print(f"   ID: {order_id}")
        print(f"   {txn} {qty} x {symbol} @ ₹{price}")
        print(f"   Status: {status}")

        if status == 'REJECTED':
            print(f"   Reason: {order.get('rejectReason', 'N/A')}")

        if status == 'COMPLETE':
            print(f"   Avg Price: ₹{order.get('averagePrice', 'N/A')}")

    streams.order_stream.on_order_update = on_order_update

    print("Starting order monitor...")
    streams.connect_order_stream()

    try:
        while True:
            import time
            time.sleep(1)
    except KeyboardInterrupt:
        streams.disconnect_all()

# Usage
order_monitor("your_app_id", "your_token")

Best Practices

Performance Tips

  • Use DataMode.LTPC for large watchlists where you only need price
  • Use DataMode.QUOTE for normal trading with OHLCV
  • Use DataMode.FULL only when you need market depth
  • Unsubscribe from tokens you no longer need

Connection Handling

  • Always implement on_connect to re-subscribe after reconnection
  • Handle on_error and on_disconnect for graceful degradation
  • Use on_reconnect to track reconnection attempts
  • Set debug=True during development for detailed logging

Thread Safety

  • Event handlers are called from WebSocket threads
  • Use thread-safe data structures when sharing state
  • Avoid blocking operations in event handlers