scala java gRPC双向流断开:客户端意外取消

2w3rbyxf  于 12个月前  发布在  Scala
关注(0)|答案(1)|浏览(348)

我正在开发一个运行时游戏服务器,它大量使用java-gRPC双向流。数据流是这样的:
Client <--grpc bi-stream--> Gateway Server <---- grpc bi-stream -----> Game Server
通过一个简单的场景例子说明问题:
网关服务器保持双向流如下(注意:代码是Scala语言,编译成JVM类,如果你来自Java,就把它看作是Python):

// Gateway Server gRPC bi-stream API
  override def stateSyncStreamDemo(
    responseObserver: StreamObserver[StateSyncFrameDemo]
  ): StreamObserver[StateSyncStreamRequestDemo] = {
    // get context from gRPC interceptor
      val `Context-UserId-Key` =
    Context.key[String]("user-id")
      val `Context-RoomId-Key` =
    Context.key[String]("room-id")

  new StreamObserver[StateSyncStreamRequestDemo] {
      override def onNext(
        request: StateSyncStreamRequestDemo
      ): Unit = {
        // send to Game Server by gRPC bi-stream
        // 1. create bi-stream to Game Server if roomId not create before
        val streamConnection = getOrCreateRoomConnection(`Context-RoomId-Key`)
        val gameServerRequestStream = streamConnection.withCallCredentials(
          new StreamShardingClient.ClientMetadataCall(
            roomId // setting roomId to Metadata
          )
        ). sendWithBiStream(someResponseStreamDefined)
        // 2. send message forward to Game Server with userId and client cmd
        gameServerRequestStream.onNext(Message(request.cmd, userId))
      }
   ...

这种向前的模式通常工作得很好。
当我断开客户端grpc(我用grpcurl测试)时发生了一个问题,Gaetway服务器<->游戏服务器的流也断开了。错误消息如下:

CANCELLED: client cancelled
io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    at io.grpc.Status.asRuntimeException(Status.java:530) ~[grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [grpc-stub-1.52.1.jar:1.52.1]
    at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.52.1.jar:1.52.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]

我只是希望只断开客户端<->网关服务器。为什么影响游戏服务器连接?谢谢
==========更新========
我发现这和上下文管理有关,在拦截器中设置Context.fork()后,Gateway Server可以成功向Game Server发送消息,但从Game ServerGateway Server推送流消息时出现错误,错误如下:

StreamShardingClient.error occurred. CANCELLED: Failed to read message.
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
    at io.grpc.Status.asRuntimeException(Status.java:539) ~[grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487) [grpc-stub-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.52.1.jar:1.52.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
    at io.grpc.Status.asRuntimeException(Status.java:530) ~[grpc-api-1.52.1.jar:1.52.1]
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366) ~[grpc-stub-1.52.1.jar:1.52.1]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.$anonfun$onNext$1(StreamShardingClient.scala:124) ~[classes/:?]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.$anonfun$onNext$1$adapted(StreamShardingClient.scala:117) ~[classes/:?]
    at scala.collection.immutable.Set$Set2.foreach(Set.scala:201) ~[scala-library-2.13.7.jar:?]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.onNext(StreamShardingClient.scala:117) ~[classes/:?]
    at scalasharding.sharding.StreamShardingClient$ResponseObserver.onNext(StreamShardingClient.scala:71) [classes/:?]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:474) ~[grpc-stub-1.52.1.jar:1.52.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:473) ~[grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:675) ~[grpc-core-1.52.1.jar:1.52.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:660) ~[grpc-core-1.52.1.jar:1.52.1]
    ... 5 more
x6492ojm

x6492ojm1#

你有没有从库中检查examples?您有一个目录,其中包含cancellation的示例。一个用于client,另一个用于server。在README.md中展示了如何在本地构建和运行它们。
从那里我可以理解的是,StreamObserver是一个interface,有三个未实现的方法。一个是onNext,这是您在问题中提供的代码片段中显示的。另外两个是onError和onCompleted。
如果我没有错的话,在每个示例(ClientServer)中,您可以看到onError被覆盖,没有太多的逻辑,但看起来这就是处理错误的地方。此外,还有其他观察器,如ClientCallStreamObserverServerCallStreamObserver,它们在示例中使用。
希望这个能帮上忙

相关问题