quarkus sse redis订阅

k10s72fa  于 2021-06-07  发布在  Redis
关注(0)|答案(2)|浏览(450)

我喜欢用quarkus中redis.subscribe的响应做一个sse。
我有一个来自quarkus快速启动的简单sse示例

@GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("{name}/streaming")
  public Multi<String> greeting(@org.jboss.resteasy.annotations.jaxrs.PathParam String name) {
    return Multi.createFrom().publisher(vertx.periodicStream(2000).toMulti())
        .map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
  }

这个效果不错,每2秒钟我就收到一个问候。。。。在我的web浏览器中
现在我尝试订阅redis,所以我应该会收到redis的消息。
redis示例:

(cmd window 1)
SUBSCRIBE message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1

(cmd window 2)
PUBLISH  message-channel HelloWorld
(integer) 1

(cmd window 1)
1) "message"
2) "message-channel"
3) "HelloWorld"

现在我用quarkus sse尝试这个:

@Inject
  ReactiveRedisClient reactiveRedisClient;

 @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("sse/redissse")
  public Multi<String> redissse() {
    List<String> subscriberList = new ArrayList();
    subscriberList.add("message-channel");

    return reactiveRedisClient.subscribe(subscriberList)
        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
        .onItem().castTo(String.class);
  }

我收到的是一个例外:

WARNING [io.ver.red.cli.imp.RedisConnectionImpl] (vert.x-eventloop-thread-0) No handler waiting for message: [subscribe, message-channel, 1]

有人能支持我吗?有一个简单的例子吗?我对此一无所知,我无法接收带有“订阅”发布的redis消息。
任何建议。。。

5vf7fwbs

5vf7fwbs1#

现在我要做的是:

@Inject
  @RedisClientName("second")
  RedisClient redisClient2;

void onStart(@Observes StartupEvent ev) throws IOException {
  this.redisClient2.subscribe(List.of("message-channel"));
}

  @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("/redis/subscribe")
  public Publisher<String> subscribechannel(){
     return eventBus.<String>consumer("io.vertx.redis.message-channel").toPublisherBuilder()
        .map(Message::body)
        .buildRs();
  }

现在它可以工作了,但是如果我从多个浏览器中执行sse,它们就会共享事件。因此,每个用户中只有一个在其他用户(浏览器)之后收到一个事件。

ldfqzlk8

ldfqzlk82#

我没有使用redis pub sub,但我确实使用了redis streams,我要做的是这样的:
`

return Multi.createBy().repeating()
    .supplier(() -> this.reactiveRedisClient.subscribe(subscriberList)
                        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
                        .onItem().castTo(String.class))
        .indefinitely()
        .onItem().disjoint();

我想既然pub-sub是非阻塞的,它只运行一次,就不会等到另一条消息到达。你必须实现你自己的目标while(true)` 以React的方式循环。

相关问题