kcp 为什么我写的KCP测试只能接收窗口大小条消息,超过了无法接收,

8wtpewkr  于 2022-10-25  发布在  其他
关注(0)|答案(8)|浏览(298)

请教如下问题
代码里面有KCP传消息和纯UDP传消息的部分
问题一:为什么我写的KCP测试只能接收窗口大小条消息,超过了无法接收。(本代码是只能接收128条)
问题二:感觉KCP传递的效率明显低于UDP


# include <chrono>

# include <future>

# include <ikcp.h>

# if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)

# include <winsock2.h>

# else

# include <sys/socket.h>

# endif

static long long
GetNowTime()
{
    return std::chrono::duration_cast<std::chrono::milliseconds>(
               std::chrono::high_resolution_clock::now().time_since_epoch())
        .count();
}

static long long
iclock64()
{
    return GetNowTime();
}

static unsigned int
iclock()
{
    return (unsigned int)(iclock64() & 0xfffffffful);
}

struct IKCP
{
    ikcpcb* kcp{nullptr};
    size_t index{0};
    long current{0};
    long slap{0};
};

class SocketSender
{
public:
    virtual ~SocketSender() { clear(); }

    SocketSender(const std::string& net_addr, int port, bool bind_addr = false)
    {
        WSAStartup(MAKEWORD(2, 2), &wsd);
        m_send_socket = socket(AF_INET, SOCK_DGRAM, 0);  // UDP
        if (INVALID_SOCKET == m_send_socket) {
            WSACleanup();
        }
        memset(&socket_addr, 0, sizeof(SOCKADDR_IN));
        socket_addr.sin_family = AF_INET;
        if (net_addr == "*" || net_addr.empty()) {
            socket_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);  // inet_addr(net_addr.c_str());
        } else {
            socket_addr.sin_addr.S_un.S_addr = inet_addr(net_addr.c_str());
        }
        //socket_addr.sin_addr.S_un.S_addr = inet_addr(net_addr.c_str());
        socket_addr.sin_port = htons(port);
        int imode{1};
        // 设置为非阻塞模式 1
        int ret = ioctlsocket(m_send_socket, FIONBIO, (u_long*)&imode);
        if (SOCKET_ERROR == ret) {
            closesocket(m_send_socket);
            WSACleanup();
        }

        if (bind_addr){
        ret = bind(m_send_socket, (sockaddr*)&socket_addr, sizeof(sockaddr));
            if (SOCKET_ERROR == ret) {
                closesocket(m_send_socket);
                WSACleanup();
            }
        }
    }

    // 清除数据
    void
    clear()
    {
        closesocket(m_send_socket);
        WSACleanup();
    }

    // 发送数据
    int
    send(const void* data, int size)
    {
        // send
        return sendto(m_send_socket, (char*)data, size, 0, (sockaddr*)&(socket_addr), skt_addr_len);
    }

    // 接收数据
    int
    recv(void* data, int maxsize)
    {
        return recvfrom(m_send_socket, (char*)data, maxsize, 0, (sockaddr*)&socket_addr, &skt_addr_len);
    }

protected:
    SOCKET m_send_socket;
    SOCKADDR_IN socket_addr;  // 服务器套接字地址
    int skt_addr_len = sizeof(SOCKADDR_IN);
    WSADATA wsd;
};

int
udp_output(const char* buf, int len, ikcpcb* kcp, void* user)
{
    SocketSender* send_net = (SocketSender*)user;
    int ret = send_net->send(buf, len);
    return ret;
}

# define PUREUDP 0

void
Publisher1()
{
    SocketSender* send_net;
    send_net = new SocketSender("127.0.0.1", 6677);

    IKCP kcp;
    kcp.kcp = ikcp_create(0x11223344, (void*)send_net);
    kcp.kcp->output = udp_output;
    ikcp_wndsize(kcp.kcp, 128, 128);
    ikcp_nodelay(kcp.kcp, 2, 10, 2, 1);
    kcp.kcp->rx_minrto = 10;
    kcp.kcp->fastresend = 1;
    kcp.index = 0;
    kcp.current = iclock();
    kcp.slap = kcp.current + 20;

    std::string str =
        "分配string为指向size + 1大小的heap空间,那个多出来的1字节是'\\0'的空间 原始文字是人类用来纪录特 ";
    int len = str.size()+1;
    char msgdata[2000];

# if PUREUDP

    // 纯UDP
    while (1) {
        ((IUINT32*)msgdata)[0] = kcp.index++;
        ((IUINT32*)msgdata)[1] = iclock();
        memcpy(msgdata + 8, str.data(), len * sizeof(char));
        //printf("Pure UDP Send message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)msgdata)[0], ((IUINT32*)msgdata)[1],
        //        msgdata + 8);
        send_net->send(msgdata, str.size()+9);
    }

# else

    while (1) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));

        auto clk = iclock();
        kcp.current = clk;
        ikcp_update(kcp.kcp, clk);

        ((IUINT32*)msgdata)[0] = kcp.index++;
        ((IUINT32*)msgdata)[1] = kcp.current;
        memcpy(msgdata + 8, str.data(), len * sizeof(char));
        int ret = ikcp_send(kcp.kcp, (char*)msgdata, (len + 8) * sizeof(char));
       // printf("Send message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)msgdata)[0], ((IUINT32*)msgdata)[1],
       //        msgdata + 8);
    }

# endif

    delete send_net;
    ikcp_release(kcp.kcp);
}

void
Subscriber1()
{
    SocketSender* receive_net;
    receive_net = new SocketSender("*", 6677, true);
    IKCP kcp;

    kcp.kcp = ikcp_create(0x11223344, (void*)receive_net);
    kcp.kcp->output = udp_output;
    ikcp_wndsize(kcp.kcp, 128, 128);
    ikcp_nodelay(kcp.kcp, 2, 10, 2, 1);
    kcp.kcp->rx_minrto = 10;
    kcp.kcp->fastresend = 1;
    kcp.current = iclock();
    kcp.slap = kcp.current + 20;

    const int max_kcp_buff_size = 2000;
    const int max_rev_size = 1400;
    char data[max_kcp_buff_size]{0};
    std::vector<char> data_vec;

# if PUREUDP

    // 纯UDP
    while (1) {
        receive_net->recv(data, 2000);
        printf("Pure UDP Receive message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)data)[0], ((IUINT32*)data)[1],
               data + 8);
    }

# else

    while (1) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));

        auto clk = iclock();
        kcp.current = clk;
        ikcp_update(kcp.kcp, clk);

        memset(data, 0, sizeof(char) * max_kcp_buff_size);
        // idx > 128后无法接收成功
        while (1) {
            int hr = receive_net->recv(data, max_rev_size);
            if (hr < 0)
                break;
            hr = ikcp_input(kcp.kcp, data, hr);
        }
        data_vec.clear();
        memset(data, 0, sizeof(char) * max_kcp_buff_size);
        while (1) {
            int hr = ikcp_recv(kcp.kcp, data, max_rev_size);
            if (hr < 0)
                break;
            data_vec.insert(data_vec.end(), data, data + hr);
        }
        printf("KCP Receive message: idx = %u, current tm = %u, msg = %s\n", ((IUINT32*)data)[0], ((IUINT32*)data)[1],
               data + 8);
    }

# endif

    delete receive_net;
    ikcp_release(kcp.kcp);
}

int
main()
{
    auto thread2 = std::async(std::launch::async, Subscriber1 );
    std::this_thread::sleep_for(std::chrono::milliseconds(100));  
    auto thread1 = std::async(std::launch::async, Publisher1 );
    thread1.wait();
    thread2.wait();
    return 0;
}
exdqitrt

exdqitrt1#

作者你好。窗口不可能无限设大。拆包拼包是说把一个大文件(比如几兆上百M的拆分成不大于窗口份数)然后依次发送和接收?接收完毕后(接收128次后),似乎"窗口满了",如何“刷新”/“清空”窗口(清理发送端还是接收端)?原作者您的example里面也是128的窗口,在同一个循环体内部发送和接收,确是可以无上限ikcp_recv的。

q7solyqu

q7solyqu2#

直接把文件拆分成 1KB 一个包。

krcsximq

krcsximq3#

作者你好。窗口不可能无限设大。拆包拼包是说把一个大文件(比如几兆上百M的拆分成不大于窗口份数)然后依次发送和接收?接收完毕后(接收128次后),似乎"窗口满了",如何“刷新”/“清空”窗口(清理发送端还是接收端)?原作者您的example里面也是128的窗口,在同一个循环体内部发送和接收,确是可以无上限ikcp_recv的。

是的,为什么呢,假如在外面设置了这个拆包分包逻辑,为什么KCP里还有拆包分包呢?KCP里面不会把接收到的数据清除掉留给下一个序号的数据嘛,很疑惑

9avjhtql

9avjhtql4#

感觉作者理解错你想问的了,你的意思是接收了128次后后面的数据就接收不了了,但是作者写的测试的例子可以一直接收,而不是一个包超过窗口大小就接不了

hs1rzwqc

hs1rzwqc5#

时间太久了,有点不记得但是的想法。也是我提问没有描述清楚,应是你说的,假设有1万条消息,接受128条消息后,后续消息没有被接受了。

xwmevbvl

xwmevbvl6#

时间太久了,有点不记得但是的想法。也是我提问没有描述清楚,应是你说的,假设有1万条消息,接受128条消息后,后续消息没有被接受了。

这个要在客户端接收服务端的ACK包然后推进kcp里,才能把snd_buf的窗口往后移,就是把收到ACK包的数据从缓存中清除,就能发送新数据

0qx6xfy6

0qx6xfy67#

时间太久了,有点不记得但是的想法。也是我提问没有描述清楚,应是你说的,假设有1万条消息,接受128条消息后,后续消息没有被接受了。
您没有在客户端进行udp的recvfrom然后input进kcp处理ACK包

e5nszbig

e5nszbig8#

对的,没错,是这样,你可以自己把接收窗口设置大一点,或者你自己实现一个拆包拼包的逻辑。

相关问题