使用capnp的golang实现发出设置异步客户端回调

pkln4tw6  于 5个月前  发布在  Go
关注(0)|答案(1)|浏览(71)

使用golang implementation of capnp,我想构建一个服务器,它可以接受来自客户端的回调函数,以便服务器稍后调用。

example_schema.capnp

using Go = import "/go.capnp";

$Go.package("rpc_example");
$Go.import("rpc_example");

interface ClientCallbackInterface {
    callbackMethod @0 () -> ();
}

interface ServerInterface {
    registerCallback @0 (callback :ClientCallbackInterface);
    invokeCallback @1 ();
}

字符串

example_server.go

package main

import (
    "capnproto.org/go/capnp/v3"
    "capnproto.org/go/capnp/v3/rpc"
    "context"
    "example-capnp/rpc_example"
    "fmt"
    "net"
)

type ServerImpl struct {
    callback rpc_example.ClientCallbackInterface
}

func (s *ServerImpl) RegisterCallback(ctx context.Context, call rpc_example.ServerInterface_registerCallback) error {
    fmt.Println("Server: Registering a callback.")

    params := call.Args()
    cb := params.Callback()
    s.callback = cb

    fmt.Println("Server: Callback registered.")

    // This invocation works
    _, _ = s.callback.CallbackMethod(ctx, nil)
    return nil
}

func (s *ServerImpl) InvokeCallback(_ context.Context, _ rpc_example.ServerInterface_invokeCallback) error {
    fmt.Println("Server: Invoking the callback.")

    // This invocation does not work.
    _, _ = s.callback.CallbackMethod(context.Background(), nil)

    fmt.Println("Server: Callback invoked.")
    return nil
}

func main() {
    ctx := context.Background()

    listener, err := net.Listen("tcp", "127.0.0.1:2000")
    if err != nil {
        fmt.Printf("%s", err.Error())
    }

    server := ServerImpl{}
    client := rpc_example.ServerInterface_ServerToClient(&server)

    rwc, err := listener.Accept()
    if err != nil {
        fmt.Printf("%s", err.Error())
    }

    conn := rpc.NewConn(rpc.NewStreamTransport(rwc), &rpc.Options{
        BootstrapClient: capnp.Client(client),
    })

    // Block until the connection terminates.
    select {
    case <-conn.Done():
        client.Release()
    case <-ctx.Done():
        _ = conn.Close()
    }
}

example_client.go

package main

import (
    "capnproto.org/go/capnp/v3/rpc"
    "context"
    "example-capnp/rpc_example"
    "fmt"
    "net"
    "time"
)

type Callback struct{}

func (c Callback) CallbackMethod(_ context.Context, call rpc_example.ClientCallbackInterface_callbackMethod) error {
    fmt.Println("Client: CallbackMethod has been invoked.")
    return nil
}

func main() {
    ctx := context.Background()

    rwc, err := net.Dial("tcp", "127.0.0.1:2000")
    if err != nil {
        panic(err)
    }

    conn := rpc.NewConn(rpc.NewStreamTransport(rwc), nil)
    defer conn.Close()

    d := rpc_example.ServerInterface(conn.Bootstrap(ctx))

    // Register callback
    fmt.Println("Client: Registering a callback.")
    callback := Callback{}
    d.RegisterCallback(ctx, func(params rpc_example.ServerInterface_registerCallback_Params) error {
        cb := rpc_example.ClientCallbackInterface_ServerToClient(callback)
        return params.SetCallback(cb)
    })

    time.Sleep(time.Second * 1)

    fmt.Println("Client: Invoking callback.")
    _, release := d.InvokeCallback(ctx, nil)
    defer release()
    fmt.Println("Client: Invoked callback.")

    time.Sleep(time.Second * 1)

}

问题

我面临的问题是,当RegisterCallback方法的作用域结束时,ServerImpl.callback将无效。因此,客户端的回调函数可以在RegisterCallback方法中调用,但不能在InvokeCallback方法中调用。
capnp的cpp repo中的This conversation表明这应该是可能的。
我是不是漏掉了什么明显的东西?

disbfnqx

disbfnqx1#

我通过在传递给RegisterCallback方法的回调函数上调用AddRef()方法实现了这一点。即,通过将ServerImpl.RegisterCallback方法中的s.callback = cb行更改为s.callback = cb.AddRef()
根据文档,AddRef()创建了一个指向相同功能的新客户端。我不确定从params.Callback()AddRef()获得的客户端有什么不同,其中一个仍然有效。

相关问题