How can I actually grab the data out of a running stream/websocket within a function?

I am quite confused how to handle websockets/streams data within a function in Python.

I have a Python script that triggers a ws/stream which runs 24/7:

async def quote_callback(q):
    print('quote', q)
    return q

# Initiate Class Instance
stream = Stream(public_key,
                secret_key,
                base_url=base_url,
                data_feed='sip')

# Subscribing to all relevant symbols
for symbol in traded_symbols:
    stream.subscribe_quotes(quote_callback, symbol)

# run stream
stream.run()

This returns an infinite stream of quotes for all symbols in traded_symbols:

quote Quote({   'ask_exchange': 'N',
    'ask_price': 79.0,
    'ask_size': 5,
    'bid_exchange': 'U',
    'bid_price': 78.99,
    'bid_size': 3,
    'conditions': ['R'],
    'symbol': 'MRK',
    'tape': 'A',
    'timestamp': 1641496334124695884})
quote Quote({   'ask_exchange': 'H',
    'ask_price': 334.64,
    'ask_size': 1,
    'bid_exchange': 'A',
    'bid_price': 334.48,
    'bid_size': 1,
    'conditions': ['R'],
    'symbol': 'FB',
    'tape': 'C',
    'timestamp': 1641496334125929728})

... etc.

Now I have a function within this script where I want to grab the (last) next value out of the stream for any given symbol like so:

def get_quote(symbol):
    ask_price = # stream to catch the (last) occurring ask_price of symbol
    return ask_price

get_quote("FB")
# should return --> 334.64

How to accomplish this?

It seems that all of the code after stream.run isn't executed when triggered via main.py while loop?

# trader.py

# Stream logic
quotes = {}

async def quote_callback(q):
    quotes[q.symbol] = q
    print(q)
    return q

# Initiate Class Instance
stream = Stream(public_key,
                secret_key,
                base_url=base_url,
                data_feed='sip')  # <- replace to SIP if you have PRO subscription

# Subscribing to all symbols
for symbol in traded_symbols:
    stream.subscribe_quotes(quote_callback, symbol)

# run stream
stream.run()

def run_rufus():
    ...
    stream_price = quotes[symbol].ask_price
# main.py
from trader import run_rufus

while True:
    run_rufus()

When I remove all of the stream part run_rufus() is continuously executed. I guess there is some bug with my code structure.

Update

import multiprocessing
import os
from alpaca_trade_api.stream import Stream

# Grab required env variables
secret_key = os.environ['SECRET_KEY']
public_key = os.environ['PUB_KEY']
base_url = os.environ['APCA_API_BASE_URL']

traded_symbols = [
    "ADBE", # Adobe
    "ABNB", # Airbnb
    "AMZN", # Amazon
    "FB"
]

quotes = {}

async def quote_callback(q):
    quotes[q.symbol] = q
    print(q)
    print(F'-----------------------------')
    print(F'Quotes: {quotes}')
    return q

# Initiate Class Instance
stream = Stream(public_key,
                secret_key,
                base_url=base_url,
                data_feed="sip")

# Subscribing to all relevant symbols
for symbol in traded_symbols:
    stream.subscribe_quotes(quote_callback, symbol)

# run stream
p = multiprocessing.Process(target=stream.run)
p.start()

def get_quote(symbol):
    if symbol in quotes: # if the dictionary quotes has a key with the value of symbol
        return quotes[symbol].ask_price
    return -1 # a return value of -1 indicates that there was no quote with the given symbol received yet

price = get_quote("FB")
while price == -1:
    price = get_quote("FB")
print(price)

returns

Quotes: {'ABNB': Quote({   'ask_exchange': 'M',
    'ask_price': 156.7,
    'ask_size': 1,
    'bid_exchange': 'Q',
    'bid_price': 156.57,
    'bid_size': 2,
    'conditions': ['R'],
    'symbol': 'ABNB',
    'tape': 'C',
    'timestamp': 1641840163168672614}), 'ADBE': Quote({   'ask_exchange': 'P',
    'ask_price': 517.0,
    'ask_size': 2,
    'bid_exchange': 'P',
    'bid_price': 516.79,
    'bid_size': 1,
    'conditions': ['R'],
    'symbol': 'ADBE',
    'tape': 'C',
    'timestamp': 1641840163466840647}), 'FB': Quote({   'ask_exchange': 'Q',
    'ask_price': 323.97,
    'ask_size': 3,
    'bid_exchange': 'V',
    'bid_price': 323.88,
    'bid_size': 3,
    'conditions': ['R'],
    'symbol': 'FB',
    'tape': 'C',
    'timestamp': 1641840160572326189}), 'AMZN': Quote({   'ask_exchange': 'M',
    'ask_price': 3178.2,
    'ask_size': 2,
    'bid_exchange': 'M',
    'bid_price': 3175.81,
    'bid_size': 2,
    'conditions': ['R'],
    'symbol': 'AMZN',
    'tape': 'C',
    'timestamp': 1641840162908516864})
}

Solution 1:

First, you can directly access the dictionary fields as members of the Quote objects - i.e. you can just do q.ask_price to get the price from the received Quotes.

In your quote_callback function you get every Quote object as an argument. So, if you always only want to access the last received Quote object which has symbol = "HB", you could define a global dictionary named e.g. quotes which will contain one key == symbol and value == last Quote with that symbol per symbol type.

Now, in get_quote you can use a simple dictionary lookup for the desired key in the quotes dictionary. In the end, you need to repeatedly call get_quote until a quote object with the desired symbol value is received. The whole code could look like this:

import multiprocessing

quotes = {}

async def quote_callback(q):
    quotes[q.symbol] = q
    return q

# Initiate Class Instance
stream = Stream(public_key,
                secret_key,
                base_url=base_url,
                data_feed="sip")

# Subscribing to all relevant symbols
for symbol in traded_symbols:
    stream.subscribe_quotes(quote_callback, symbol)

# run stream
p = multiprocessing.Process(target=stream.run)
p.start()

def get_quote(symbol):
    if symbol in quotes.keys():
        return quotes[symbol].ask_price
    print(quotes)
    return -1

price = get_quote("FB")
while price == -1:
    price = get_quote("FB")
print(price)

If there are any errors, please add them to your question, then I can take a look.