erlang 当在并发TCP请求中接收到大量数据包时,如何识别完整的响应

iovurdzv  于 2022-12-16  发布在  Erlang
关注(0)|答案(2)|浏览(107)

我有一个单一的TCP连接到服务器,但可能有多个请求在同一时间。大多数时候,响应将是如此之大,我会不断收到大量的数据块。这是可能的,我检查数据长度,以确定它是流的结束。但与多个请求,有时包“混合”与另一个请求,导致许多失败。
比如说,
对于正常请求:

    • 〉请求#1 1/3
    • 〉请求#1 2/3
    • 〉请求#1 3/3
    • 〉请求#1已完成

在真实的生活中:

    • 〉请求#1 1/3
    • 〉请求#1 2/3
    • 〉请求#2 1/3
    • 〉请求#2 2/3
    • 〉在某个点失败

现在,我唯一的想法是通过一个接一个地服务请求来使它串行化,但是它不能完全解决我的问题,因为有时候我会收到订阅的事件,而这些事件是不受控制的。
一般来说,我如何解决这个TCP问题?
我展示了下面的一些代码(以防有人知道erlang和elixir)

# Create TCP connection
{:ok, socket} =
      :gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])

# Send request
def handle_call({:send, msg}, from, state) do
  :ok = :gen_tcp.send(state.socket, msg)
  new_state = %{state | from: from, msg: ""}

  {:noreply, new_state}
end

# When it receive packet
def handle_info({:tcp, _socket, msg}, state) do
  new_state = %{state | msg: state.msg <> msg}
  current_msg_size = byte_size(new_state.msg)
  defined_msg_size = Response.get_body_size(new_state.msg) # this msg size can read from the first packet's header

  cond do
    current_msg_size == defined_msg_size ->
      GenServer.reply(new_state.from, {:ok, new_state.msg})
      {:noreply, %{new_state | msg: ""}}

    current_msg_size > defined_msg_size ->
      GenServer.reply(new_state.from, {:error, "Message size exceed."})
      {:noreply, %{new_state | msg: ""}}

    true ->
      {:noreply, new_state}
  end
end
2hh7jdfx

2hh7jdfx1#

在TCP级别,在连接中,requestresponse不存在,它是一个管道,按顺序从一端向另一端传输字节。
为了处理单个连接上的交叉存取,您必须在堆栈的上一级处理它。
可能的解决方案包括:
1.序列化
1.成帧:您将响应成帧,并以某种方式保证帧在服务器中完全发送,而不会交错,然后您的接收器可以检查每个帧(可能跨越多个receive),并将其分配给相应的请求。
1.每个请求对应一个连接:让操作系统处理线路上的交叉存取,但每次都需要一个套接字和一次握手

kuuvgm7e

kuuvgm7e2#

您可以将tcp选项从active = true更改为active = falseactive = once

:gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])

然后你可以自己控制消息的接收,而不是被传入的消息淹没。
1、接收上层的调用,处理你的客户端请求,设置active = false,等待服务器响应。
2、尽量不要只接收相关消息,在进程消息队列中忽略这些消息或保存在缓冲区中。
3、当收到服务器响应时,对其进行处理,然后设置active = once
4,如果超时,则设置active = once
https://www.erlang.org/doc/man/gen_tcp.html#controlling_process-2
控制进程(套接字,PID)-〉确定|{错误,原因}类型套接字=套接字()Pid = pid()原因=已关闭|非所有者|巴达尔格|inet:posix()为Socket分配一个新的控制进程Pid。控制进程是从Socket接收消息的进程。如果被当前控制进程之外的任何其他进程调用,则返回{error,not_owner}。如果Pid标识的进程不是现有的本地pid,则返回{error,badarg}。在某些情况下,在执行此函数期间关闭Socket时也可能返回{error,badarg}。
如果套接字设置为活动模式,此函数将调用方邮箱中的所有消息传输到新的控制进程。如果在传输过程中有任何其他进程正在与套接字交互,则传输可能无法正常工作,消息可能会保留在调用方的邮箱中。例如,在传输完成之前更改套接字活动模式可能会导致这种情况。

相关问题