How can I actually grab the data out of a running stream/websocket within a function?
I am quite confused how to handle websockets/streams data within a function in Python.
I have a Python script that triggers a ws/stream which runs 24/7:
async def quote_callback(q):
print('quote', q)
return q
# Initiate Class Instance
stream = Stream(public_key,
secret_key,
base_url=base_url,
data_feed='sip')
# Subscribing to all relevant symbols
for symbol in traded_symbols:
stream.subscribe_quotes(quote_callback, symbol)
# run stream
stream.run()
This returns an infinite stream of quotes for all symbols in traded_symbols
:
quote Quote({ 'ask_exchange': 'N',
'ask_price': 79.0,
'ask_size': 5,
'bid_exchange': 'U',
'bid_price': 78.99,
'bid_size': 3,
'conditions': ['R'],
'symbol': 'MRK',
'tape': 'A',
'timestamp': 1641496334124695884})
quote Quote({ 'ask_exchange': 'H',
'ask_price': 334.64,
'ask_size': 1,
'bid_exchange': 'A',
'bid_price': 334.48,
'bid_size': 1,
'conditions': ['R'],
'symbol': 'FB',
'tape': 'C',
'timestamp': 1641496334125929728})
... etc.
Now I have a function within this script where I want to grab the (last) next value out of the stream for any given symbol like so:
def get_quote(symbol):
ask_price = # stream to catch the (last) occurring ask_price of symbol
return ask_price
get_quote("FB")
# should return --> 334.64
How to accomplish this?
It seems that all of the code after stream.run
isn't executed when triggered via main.py
while loop?
# trader.py
# Stream logic
quotes = {}
async def quote_callback(q):
quotes[q.symbol] = q
print(q)
return q
# Initiate Class Instance
stream = Stream(public_key,
secret_key,
base_url=base_url,
data_feed='sip') # <- replace to SIP if you have PRO subscription
# Subscribing to all symbols
for symbol in traded_symbols:
stream.subscribe_quotes(quote_callback, symbol)
# run stream
stream.run()
def run_rufus():
...
stream_price = quotes[symbol].ask_price
# main.py
from trader import run_rufus
while True:
run_rufus()
When I remove all of the stream part run_rufus()
is continuously executed. I guess there is some bug with my code structure.
Update
import multiprocessing
import os
from alpaca_trade_api.stream import Stream
# Grab required env variables
secret_key = os.environ['SECRET_KEY']
public_key = os.environ['PUB_KEY']
base_url = os.environ['APCA_API_BASE_URL']
traded_symbols = [
"ADBE", # Adobe
"ABNB", # Airbnb
"AMZN", # Amazon
"FB"
]
quotes = {}
async def quote_callback(q):
quotes[q.symbol] = q
print(q)
print(F'-----------------------------')
print(F'Quotes: {quotes}')
return q
# Initiate Class Instance
stream = Stream(public_key,
secret_key,
base_url=base_url,
data_feed="sip")
# Subscribing to all relevant symbols
for symbol in traded_symbols:
stream.subscribe_quotes(quote_callback, symbol)
# run stream
p = multiprocessing.Process(target=stream.run)
p.start()
def get_quote(symbol):
if symbol in quotes: # if the dictionary quotes has a key with the value of symbol
return quotes[symbol].ask_price
return -1 # a return value of -1 indicates that there was no quote with the given symbol received yet
price = get_quote("FB")
while price == -1:
price = get_quote("FB")
print(price)
returns
Quotes: {'ABNB': Quote({ 'ask_exchange': 'M',
'ask_price': 156.7,
'ask_size': 1,
'bid_exchange': 'Q',
'bid_price': 156.57,
'bid_size': 2,
'conditions': ['R'],
'symbol': 'ABNB',
'tape': 'C',
'timestamp': 1641840163168672614}), 'ADBE': Quote({ 'ask_exchange': 'P',
'ask_price': 517.0,
'ask_size': 2,
'bid_exchange': 'P',
'bid_price': 516.79,
'bid_size': 1,
'conditions': ['R'],
'symbol': 'ADBE',
'tape': 'C',
'timestamp': 1641840163466840647}), 'FB': Quote({ 'ask_exchange': 'Q',
'ask_price': 323.97,
'ask_size': 3,
'bid_exchange': 'V',
'bid_price': 323.88,
'bid_size': 3,
'conditions': ['R'],
'symbol': 'FB',
'tape': 'C',
'timestamp': 1641840160572326189}), 'AMZN': Quote({ 'ask_exchange': 'M',
'ask_price': 3178.2,
'ask_size': 2,
'bid_exchange': 'M',
'bid_price': 3175.81,
'bid_size': 2,
'conditions': ['R'],
'symbol': 'AMZN',
'tape': 'C',
'timestamp': 1641840162908516864})
}
Solution 1:
First, you can directly access the dictionary fields as members of the Quote
objects - i.e. you can just do q.ask_price
to get the price from the received Quote
s.
In your quote_callback
function you get every Quote
object as an argument. So, if you always only want to access the last received Quote
object which has symbol
= "HB"
, you could define a global dictionary named e.g. quotes
which will contain one key == symbol and value == last Quote
with that symbol per symbol type.
Now, in get_quote
you can use a simple dictionary lookup for the desired key in the quotes
dictionary. In the end, you need to repeatedly call get_quote
until a quote object with the desired symbol value is received. The whole code could look like this:
import multiprocessing
quotes = {}
async def quote_callback(q):
quotes[q.symbol] = q
return q
# Initiate Class Instance
stream = Stream(public_key,
secret_key,
base_url=base_url,
data_feed="sip")
# Subscribing to all relevant symbols
for symbol in traded_symbols:
stream.subscribe_quotes(quote_callback, symbol)
# run stream
p = multiprocessing.Process(target=stream.run)
p.start()
def get_quote(symbol):
if symbol in quotes.keys():
return quotes[symbol].ask_price
print(quotes)
return -1
price = get_quote("FB")
while price == -1:
price = get_quote("FB")
print(price)
If there are any errors, please add them to your question, then I can take a look.