我正在使用Binance WebSocket API获取信息。我们已经有了主代码库,但决定在独立的应用程序中实现websockets。它看起来像这样:
import asyncio
from threading import Thread
import uvicorn
from fastapi import FastAPI
from binance import AsyncClient, BinanceSocketManager
app = FastAPI()
@app.get("/")
async def root():
return "Websocket app"
async def main_socket_stream():
client = await AsyncClient.create()
binance_manager = BinanceSocketManager(client=client)
multiplex_socket = binance_manager.futures_multiplex_socket(symbols)
async with multiplex_socket as active_socket:
while True:
result = await active_socket.recv()
print(result)
# There are lots of further logic - proceed data, save something to db, etc.
def side_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(main_socket_stream())
if __name__ == "__main__":
thread = Thread(target=side_thread, args=(), daemon=True)
thread.start()
uvicorn.run(app, port=5105)
我删除了一些不必要的部分,但主要的想法是明确的。所以,我有WebSocket在后台-它的工作。但我想从主应用程序控制它们的能力(基于Django).
我想到了这样的事情:
async with multiplex_socket as active_socket:
while True:
if await EventListenerClass.new_event_fired():
break # or do something special
result = await active_socket.recv()
print(result)
我想用Redis的Pub-Sub做一个“EventList”类,在channel中监听新消息。将有几种类型的消息,例如:“stop_WebSocket”、“reload_with_updated_symbols_list”等。
我需要一个解决方案:
- 每隔几秒钟就能工作
- 这将给予我控制websockets的机会
我对Redis的看法是正确的吗?
1条答案
按热度按时间kuarbcqp1#
一般的工作流程应该包括Redis或类似的Sub-Pub系统,用于协调消息传递。如果是同一个应用程序,你可以使用互斥锁从共享空间读取数据,这本质上与Redis这样的Sub-Pub执行相同的功能。
我熟悉Swagger中的Redis解决方案,Swagger是一个Python REST API模式/框架。Redis成为一种检查状态的方法。请注意,如果您需要低延迟高带宽请求,则可能需要解决性能问题,尽管通常小型系统或VM中的Redis能够在更新后的毫秒内响应。
你也可以设计自己的实时子发布系统,使用网络阻塞来解决并发问题,然而,redis是一个广泛采用的解决方案,存在于各种平台上。
Qt使用Websockets来处理Cef客户端集成,使用从C++到JavaScript的绑定。这是从主机应用程序向CEF窗口发送消息的另一种方式。