redis 如何在侧线程中监听事件?

vnjpjtjt  于 8个月前  发布在  Redis
关注(0)|答案(1)|浏览(93)

我正在使用Binance WebSocket API获取信息。我们已经有了主代码库,但决定在独立的应用程序中实现websockets。它看起来像这样:

import asyncio
from threading import Thread

import uvicorn
from fastapi import FastAPI

from binance import AsyncClient, BinanceSocketManager

app = FastAPI()

@app.get("/")
async def root():
    return "Websocket app"

async def main_socket_stream():
    client = await AsyncClient.create()
    binance_manager = BinanceSocketManager(client=client)
    multiplex_socket = binance_manager.futures_multiplex_socket(symbols)

    async with multiplex_socket as active_socket:
        while True:
            result = await active_socket.recv()
            print(result)
            # There are lots of further logic - proceed data, save something to db, etc.

def side_thread():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    asyncio.run(main_socket_stream())

if __name__ == "__main__":
    thread = Thread(target=side_thread, args=(), daemon=True)
    thread.start()
    uvicorn.run(app, port=5105)

我删除了一些不必要的部分,但主要的想法是明确的。所以,我有WebSocket在后台-它的工作。但我想从主应用程序控制它们的能力(基于Django).
我想到了这样的事情:

async with multiplex_socket as active_socket:
    while True:
        if await EventListenerClass.new_event_fired():
            break  # or do something special
        result = await active_socket.recv()
        print(result)

我想用Redis的Pub-Sub做一个“EventList”类,在channel中监听新消息。将有几种类型的消息,例如:“stop_WebSocket”、“reload_with_updated_symbols_list”等。
我需要一个解决方案:

  • 每隔几秒钟就能工作
  • 这将给予我控制websockets的机会

我对Redis的看法是正确的吗?

kuarbcqp

kuarbcqp1#

一般的工作流程应该包括Redis或类似的Sub-Pub系统,用于协调消息传递。如果是同一个应用程序,你可以使用互斥锁从共享空间读取数据,这本质上与Redis这样的Sub-Pub执行相同的功能。
我熟悉Swagger中的Redis解决方案,Swagger是一个Python REST API模式/框架。Redis成为一种检查状态的方法。请注意,如果您需要低延迟高带宽请求,则可能需要解决性能问题,尽管通常小型系统或VM中的Redis能够在更新后的毫秒内响应。
你也可以设计自己的实时子发布系统,使用网络阻塞来解决并发问题,然而,redis是一个广泛采用的解决方案,存在于各种平台上。
Qt使用Websockets来处理Cef客户端集成,使用从C++到JavaScript的绑定。这是从主机应用程序向CEF窗口发送消息的另一种方式。

相关问题