gRPC 实战之——客户端为 Stream,服务端为 Stream

x33g5p2x  于2022-06-06 转载在 其他  
字(4.0k)|赞(0)|评价(0)|浏览(322)

一 需求

根据 id 查 name

二 编写 proto 文件

syntax = "proto3";
package grpc.proto;
option java_package = "com.grpc.proto";
option java_outer_classname = "StudentData";
option java_multiple_files = true ;
// 定义接口
service StudentService {
    // 请求一个 Requset 对象,响应一个 Response 对象
    rpc queryStudentNameById(MyRequestId) returns(MyResponseName) {}
    // 请求一个 Requset 对象,响应一个 Stream 对象
    rpc queryStudentsByCourseName(MyRequestCourseName) returns(stream MyResponseStudentsStream) {}
    // 请求一个 Stream 对象,响应一个 Response 对象
    rpc queryStudentsByCourseName2(stream MyRequestCourseName) returns(MyResponseStudents) {}
    // 请求一个 Stream,响应一个 Stream 对象,本例测试这个接口
    rpc queryStudentNameById2(stream MyRequestId) returns(stream MyResponseName) {}
}

message MyRequestId
{
    int32 id = 1 ;
}

message MyResponseName
{
    string name = 1 ;
}

message MyStudent
{
    int32 id = 1 ;
    string name = 2;
    string courseName = 3 ;
}

message MyResponseStudents
{
    // 服务端的响应结果是集合类型,因此需要加上 repeated
    repeated MyStudent students = 1 ;
}

// 数据结构,定义请求的 Request 对象
message MyRequestCourseName
{
    string courseName = 1 ;
}
// 数据结构,定义响应的 Stream
message MyResponseStudentsStream
{
    int32 id = 1 ;
    string name = 2;
    string courseName = 3 ;
}

三 编写接口实现类

package grpc;

import grpc.proto.*;
import io.grpc.stub.StreamObserver;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
    @Override
    public StreamObserver<MyRequestId> queryStudentNameById2(StreamObserver<MyResponseName> responseObserver) {
        MyStreamObserver2 observer = new MyStreamObserver2();
        observer.setResponseObserver(responseObserver);
        return observer;
    }

    class MyStreamObserver2 implements StreamObserver<MyRequestId> {
        private StreamObserver<MyResponseName> responseObserver;
        private MyResponseName responseStudentName;

        public void setResponseObserver(StreamObserver<MyResponseName> responseObserver) {
            this.responseObserver = responseObserver;
        }

        @Override
        public void onNext(MyRequestId value) {
            System.out.println("接收到的请求参数是:" + value.getId());
            // 假设查到的结果是“zs”
            this.responseStudentName = MyResponseName.newBuilder().setName("zs").build();
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onCompleted() {
            responseObserver.onNext(responseStudentName);
            responseObserver.onCompleted();
        }
    }
}

四 编写服务端代码

package grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class MyGRPCServer {
    private Server server;

    // 启动服务
    private void start() throws IOException {
        int port = 8888;
        server = ServerBuilder.forPort(port)
                .addService(new StudentServiceImpl())
                .build()
                .start();
        Runtime.getRuntime().addShutdownHook(new Thread(() ->{
                System.err.println(Thread.currentThread().getName() + ",关闭JVM");
                // 当 JVM 关闭时,也同时关闭 MyGRPCServer服 务
                MyGRPCServer.this.stop();
            }
        ));
    }

    // 关闭服务
    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            // 等待服务结束
            server.awaitTermination();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        final MyGRPCServer server = new MyGRPCServer();
        server.start();
        server.blockUntilShutdown();
    }
}

五 编写客户端代码

package grpc;

import grpc.proto.MyRequestId;
import grpc.proto.MyResponseName;
import grpc.proto.StudentServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

public class MyGRPCClient {
    public static void main(String[] args) throws Exception {
        // 创建一个客户端
        ManagedChannel client = ManagedChannelBuilder.forAddress("127.0.0.1", 8888)
                .usePlaintext().build();

        // 在 grpc 中,如果是以 Stream 方式发出请求,则此请求是异步的。因此,不能再使用阻塞式 stub 对象。
        StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc
                .newStub(client);

        // 请求一个 Stream,响应一个 Stream
        StreamObserver<MyRequestId> requestIdObserver = stub.queryStudentNameById2(new StreamObserver<MyResponseName>() {
            @Override
            public void onNext(MyResponseName value) {
                System.out.println("接收到的响应:" + value.getName());
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("查询结束");
            }
        });

        requestIdObserver.onNext(MyRequestId.newBuilder().setId(1).build());
        requestIdObserver.onCompleted();
        Thread.sleep(3000);
        client.shutdown();
    }
}

六 测试

1 启动服务端

2 启动客户端

3 服务端打印

接收到的请求参数是:1

4 客户端打印

接收到的响应:zs

查询结束

相关文章