协程(coroutine)的原理与实现

x33g5p2x  于2021-11-27 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(271)

协程概念

协程相当于用户态的线程,更轻量级。线程的调度是操作系统完成的,而协程的调度是由用户态控制。线程相比进程来说,上下文切换快,资源开销变小,但是作为操作系统的,但是多线程如果设计的不够好,可能有大量的锁同步、切换等待。除此之外,如果一个系统的瓶颈在 IO 上,一个线程可能不能完全发挥出它的作用。而协程一般在线程中运行,有用户态的调度器调度,不需要进行反复的系统调用,而且可以利用到线程的并发优势,协程的执行效率极高,在一些高并发系统应用比较广泛。

协程可以看作一种使用起来比较特殊的函数,函数的调用一般是有一个入口和返回,且调用顺序明确。但是协程在函数内部可以让出(yield),转而执行另一个函数,但是此时该协程并未真正结束,只是暂时让出 CPU 执行权,在适当的时候返回来可以接着恢复执行(resume),这种执行的转换不是函数调用,而是类似于 CPU 的中断。所以协程也成为轻量级线程。

默认的协程切换的顺序可能是没有保障的,比如执行 A, B, C 三个子程序,因为第一个子程序遇到 IO 转而切换到 C 进行处理,这时候,子程序的执行可能变成了 A, C, B,但也有一些解决办法。

协程并不是万能的,合适的场景使用合适的工具更重要。协程可以减少 Callback 的使用,但不能完全替代 Callback。如果基于事件驱动的编程就不能发挥协程的作用而用 Callback 更合适。

但是协程是协作式多任务的,而线程典型是抢占式多任务的。这意味着协程提供并发性而非并行性。单个线程中的协程并不是并行执行的,而是顺序执行的,每次只有一个协程在被调度执行。只能是一个协程结束或 yield 后,再执行另一个协程。而线程才是真正并发执行的。协程只是以一种特殊的方式去运行一个函数,不要在协程内使用线程级别的锁来做协程同步,如果一个协程持有了锁而让出了执行权,其他协程再尝试去加锁,这个整个线程将发生死锁,该线程的所有协程都无法执行了。

协程并不都是不需要锁,因为协程调度一般是 N-M 模型(N个线程,M个协程),支持协程在多线程中切换,也可以指定协程在固定的线程中执行,这样可以重复利用每一个线程。如果多个线程运行的协程访问共享资源还是有竞争的,需要手动加锁。

协程并不能完全替代线程,而多线程/进程+多协程才是最好的办法,即充分利用了 CPU 的多核性能,有充分发挥协程的高效率。

协程的创建,就是把一个函数包装成一个协程对象,再用协程的方式将这个函数跑起来,协程调度就使用协程调度器把这些协程一个个都消化掉。IO 调度其实就是在调度协程时,如果发现这个协程在等待 IO 就绪,先让出这个协程的执行权,等 IO 就绪后再恢复这个协程的运行。定时器,就是给调度协程预设一个协程对象,等定时时间到了就恢复协程对象。

现在已经实现协程的编程语言包括:

  • 重量级的有 C#、erlang、golang 等
  • 轻量级有 python、lua、javascript、ruby
  • 还有函数式的 scala、scheme等。

C/C++ 之前不直接支持协程语义(从 C++20 开始支持),但也有不少开源的协程库,如:

Protothreads:一个“蝇量级” C 语言协程库(利用 C 语言的 switch case 的奇淫技巧)。

libco:来自腾讯的开源协程库libco(利用汇编代码)Github

coroutine:云风的一个C语言同步协程库(利用 glibc 的 ucontext 组件)Github

C/C++ 语言其标准内并未未直接支持协程。但 Boost C++ Library 中的 Boost.Context 实现了在POSIX、Mac OS X 和 Windows 上支持 ARM、MIPS、PowerPC、SPAR 和 x86 的上下文切换。可以在 Boost.Context 之上建造协程。

C 语言标准库中有非局部跳转函数 setjmp 和 longjmp 分别是保存和恢复:栈指针、程序计数器、被调用者保存的寄存器和 ABI 要求的任何其他内部状态。在 C99 标准中,跳转到已经用 return 或 longjmp 终止的函数是未定义的,但是大多数 longjmp 实现在跳转时不专门销毁调用栈中的局部变量,在被后续的函数调用等覆写之前跳转回来恢复时仍是原样,这允许在实现协程时谨慎的用到它们。

POSIX.1-2001/SUSv3 进一步提供了操纵上下文的强力设施:setcontext、getcontext、makecontext和swapcontext,可方便地用来实现协程,但是由于 makecontext 的参数定义不符合 C99 标准要求,这些函数在 POSIX.1-2004 中被废弃,并在 POSIX.1-2008 中被删除。POSIX.1-2001/SUSv3定义了 sigaltstack,可用来在不能获得 makecontext 的情况下稍微迂回的实现协程。极简实现不采用有关的标准 API 函数进行上下文交换,而是写一小块内联汇编只对换栈指针和程序计数器故而速度明显的要更快。

现在也有很多作者开源了写成库,包括:libtask、lthread、libCoroutine、libconcurrency、libcoro、libdill、libaco、libco等等。

ucontext 系列函数

Linux 下可以使用 ucontext 系列函数实现协程,其声明如下:

#include <ucontext.h>

int getcontext(ucontext_t *ucp);
int setcontext(const ucontext_t *ucp);
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);

这些函数允许在用户级别的对一个进程中的多个控制线程之间进行上下文切换。

mcontext_t 类型与机器有关,是不透明的。 ucontext_t 类型是一个结构体,至少有以下字段:

typedef struct ucontext_t
{
	struct ucontext_t *uc_link;
	sigset_t uc_sigmask;
 	stack_t uc_stack;
 	mcontext_t uc_mcontext;
 	...
} ucontext_t;

sigset_t 和 stack_t 定义在 <signal.h> 中。

uc_link 指向当前上下文终止时将被恢复的上下文。

uc_sigmask 是这个上下文中被屏蔽的信号集。

uc_stack 是这个上下文使用的堆栈。

uc_mcontext 是保存的上下文的机器特定表示。

getcontext

int getcontext(ucontext_t *ucp);

函数 getcontext 将 ucp 指向的结构初始化为当前活动的上下文。将指向的结构体初始化为当前活动的上下文。

setcontext

int setcontext(const ucontext_t *ucp);

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);

函数 setcontext 恢复 ucp 所指向的用户上下文。这个 ucp 应该是通过 getcontext 或者 makecontext 得到的。

如果上下文是通过调用 getcontext 获得的,程序执行继续执行。

如果上下文是通过调用 makecontext 获得的,那么程序的执行将通过 makecontext 的第二个参数指定的函数 func 来继续执行。当这个 func 函数返回,则恢复 makecontext 第一个参数指向的上下文 context_t 中指向的 uc_link。如果 uc_link 为 NULL,则线程退出。

一个简单的实例:

#include <stdio.h>
#include <unistd.h>
#include <ucontext.h>

int main()
{
    int a = 1;
    ucontext_t ctx_1;
    getcontext(&ctx_1); //将当前上下文保存到ctx_1中

    a++;
    printf("%d\n", a);

    setcontext(&ctx_1); //恢复ctx_1保存的上下文

    return 0;
}

这个函数的运行结果为:

1
2
3
...

makecontext

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);

makecontext 函数修改了由 ucp 指向的上下文(该上下文是通过 getcontext 获得的)。调用者必须给这个上下文分配一个新的堆栈,然后给该上下文指定栈空间 ucp->stack,设置后继的上下文 ucp->uc_link。

当这个上下文后来被激活时(使用 setcontext 或 swapcontext),函数 func 被调用,并传递给 argc 之后的一系列整数参数。调用者必须在 argc 中指定参数的数量。这个函数返回时,继任的后继上下文被激活。如果后继上下文指针为 NULL,则线程退出。

一个简单的实例:

#include <stdio.h>
#include <unistd.h>
#include <ucontext.h>

void func(void *arg)
{
    puts("exec func.");
}

int main()
{
    char stack[1024];
    ucontext_t ctx_1;
    getcontext(&ctx_1); //将当前上下文保存到ctx_1中
    ctx_1.uc_stack.ss_sp = stack;
    ctx_1.uc_stack.ss_size = sizeof(stack);
    ctx_1.uc_stack.ss_flags = 0;
    ctx_1.uc_link = NULL;

    makecontext(&ctx_1, (void(*)(void))func, 0);
    
    setcontext(&ctx_1); //激活ctx_1,先执行func函数,func返回时,执行后继上下文,因为ctx_1.uc_link为NULL,直接为空
    printf("main exit.");
    return 0;
}

运行结果:

exec func.

swapcontext

int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);

swapcontext 函数将当前上下文保存在 oucp 所指向的结构体中,然后激活 ucp 所指向的上下文。因为一个线程必须可以主动让出 CPU 给其他线程,swapcontext 函数就可以完成这个任务。

一个简单的实例:

#include <iostream>
#include <ucontext.h>

void func(void *arg)
{
    puts("exec func.");
}

int main()
{
    char stack[1024];
    ucontext_t ctx_1;
    ucontext_t ctx_2;
    getcontext(&ctx_1); //将当前上下文保存到ctx_1中
    ctx_1.uc_stack.ss_sp = stack;
    ctx_1.uc_stack.ss_size = sizeof(stack);
    ctx_1.uc_stack.ss_flags = 0;
    ctx_1.uc_link = &ctx_2;

    makecontext(&ctx_1, (void(*)(void))func, 0);
    //swapcontext将当前上下保存在ctx_2中,再激活ctx_1的上下文
    //而上面ctx_1使用了makecontext,在被激活时,是先执行func函数,当func函数返回时,执行ctx_1.uc_link指向的后继上下文
    //因为后继上下文ctx_2是在swapcontext中获取的,所以将会从swapcontext后面继续执行
    swapcontext(&ctx_2, &ctx_1);
    puts("main exit.");
    return 0;
}

运行结果:

exec func.
main exit.

如果将上面的 ctx_1.uc_link = &ctx_2; 改为 ctx_1.uc_link = NULL; 执行结果将变为:

exec func.

使用ucontext系列函数实现协程

此代码是从 sylar 框架中取出并简化的一个 C++ 封装的协程,使用了 ucontext 系列函数实现,仅仅是封装了协程对象,不包含协程调度模块以及 hook 模块,有兴趣的同学可以参考 sylar 源代码。

//yCoroutine.h

#ifndef __YCOROUTINE_H__
#define __YCOROUTINE_H__

#include <memory>
#include <functional>
#include <ucontext.h>

class yCoroutine : public std::enable_shared_from_this<yCoroutine>
{
public:
    typedef std::shared_ptr<yCoroutine> ptr;
    
    enum State
    {
        READY, //就绪状态
        RUNNING, //运行状态
        TERM,    //结束状态
    };
private:
    yCoroutine();

public:
    yCoroutine(std::function<void()> cb, size_t stacksize = 0, bool run_in_scheduler = false);
    ~yCoroutine();
    
    void reset(std::function<void()> cb);
    //当前协程恢复到执行状态
    void resume();
    //当前协程让出执行权
    void yield();
    //获取协程ID
    uint64_t getId() const { return m_id; }
    //获取协程的状态
    State getState() const { return m_state; }

public:
    //设置当前正在运行的协程,即设置线程局部变量t_coroutine的值
    static void SetThis(yCoroutine* co);
    //返回当前线程正在执行的协程,如果未创建,返回该线程创建的第一个协程
    static yCoroutine::ptr GetThis();
    //协程入口函数
    static void MainFunc();
    //获取当前协程ID
    static uint64_t GetCoroutineId();

private:
    uint64_t m_id = 0;          //协程id
    uint32_t m_stacksize = 0;   //协程栈空间大小
    State m_state = READY;      //协程状态
    ucontext_t m_ctx;           //协程上下文
    void* m_stack = nullptr;    //协程栈地址
    std::function<void()> m_cb; //协程入口函数
    bool m_runInScheduler;      //本携程是否参与调度器调度
};

#endif /*__YCOROUTINE_H__*/
//yCoroutine.cpp

#include "yCoroutine.h"
#include <iostream>
#include <atomic>
#include <cassert>
#include <thread>

static std::atomic<uint64_t> s_coroutine_id {0};
static std::atomic<uint64_t> s_coroutine_count{0};
static uint32_t g_co_stack_size = 128 * 1024;

static thread_local yCoroutine* t_coroutine = nullptr;              //线程当前运行的协程
static thread_local yCoroutine::ptr t_thread_coroutine = nullptr;   //线程的主协程

yCoroutine::yCoroutine()
{
    SetThis(this);
    m_state = RUNNING;
    getcontext(&m_ctx);
    ++s_coroutine_count;
    m_id = s_coroutine_id++;
}

yCoroutine::~yCoroutine()
{
    std::cout << "yCoroutine::~yCoroutine id = " << m_id << std::endl;
    --s_coroutine_count;
    if (m_stack)
    {
        assert(m_state == TERM);
        free(m_stack);
    }
    else
    {
        //没有栈说明就是当前线程的主协程
        assert(!m_cb);
        assert(m_state == RUNNING);

        yCoroutine* cur_co = t_coroutine; 
        if (cur_co == this)
            SetThis(nullptr);
    }
}

void yCoroutine::SetThis(yCoroutine* co)
{
    if (!co) return;
    t_coroutine = co;
}

yCoroutine::ptr yCoroutine::GetThis()
{
    if (t_coroutine)
        return t_coroutine->shared_from_this();
    //这里会调用私有构造函数,然后调用SetThis,将t_coroutine设置为this
    yCoroutine::ptr main_co(new yCoroutine);
    assert(t_coroutine == main_co.get());
    t_thread_coroutine = main_co;
    return t_coroutine->shared_from_this();
}

//带参构造函数用于创建其他协程,需要分配栈空间
yCoroutine::yCoroutine(std::function<void()> cb, size_t stacksize, bool run_in_scheduler)
    : m_id(s_coroutine_id++), m_cb(cb), m_runInScheduler(run_in_scheduler)
{
    ++s_coroutine_count;
    m_stacksize = stacksize ? stacksize : g_co_stack_size;
    m_stack = malloc(m_stacksize);
    getcontext(&m_ctx);
    m_ctx.uc_link = nullptr;
    m_ctx.uc_stack.ss_sp = m_stack;
    m_ctx.uc_stack.ss_size = m_stacksize;

    makecontext(&m_ctx, &yCoroutine::MainFunc, 0);
    std::cout << "yCoroutine::yCoroutine() id=" << m_id << std::endl;
}

void yCoroutine::reset(std::function<void()> cb)
{
    assert(m_stack);
    assert(m_state == TERM);
    m_cb = cb;
    getcontext(&m_ctx);
    m_ctx.uc_link = nullptr;
    m_ctx.uc_stack.ss_sp = m_stack;
    m_ctx.uc_stack.ss_size = m_stacksize;
    makecontext(&m_ctx, &yCoroutine::MainFunc, 0);
    m_state = READY;
}

void yCoroutine::yield()
{
    //协程运行完成后会自动yield一次,然后回到主协程
    assert(m_state == RUNNING || m_state == TERM);
    SetThis(t_thread_coroutine.get());
    if (m_state != TERM)
        m_state = READY;
    //如果协程不参与调度器,与当前线程的主协程进行swap
    if(!m_runInScheduler)
        swapcontext(&m_ctx, &(t_thread_coroutine->m_ctx));
    // //如果协程参与调度器调度,应该和调度器的主协程进行swap
    // else
    // swapcontext(&m_ctx, &(Scheduler::GetMainCoroutine()->m_ctx));
}

void yCoroutine::resume()
{
    assert(m_state != TERM && m_state != RUNNING);
    SetThis(this);  //设置为线程当前运行的协程
    m_state = RUNNING;
    //如果协程不参与调度器,与当前线程的主协程进行swap
    if(!m_runInScheduler)
        swapcontext(&(t_thread_coroutine->m_ctx), &m_ctx);
    // //如果协程参与调度器调度,应该和调度器的主协程进行swap
    // else
    // swapcontext(&(Scheduler::GetMainCoroutine()->m_ctx), &m_ctx));

}

void yCoroutine::MainFunc()
{
    yCoroutine::ptr cur_co = GetThis(); //t_coroutine引用计数+1
    assert(cur_co);
    cur_co->m_cb(); //执行回调函数
    cur_co->m_cb = nullptr;
    cur_co->m_state = TERM;
    auto raw_ptr = cur_co.get();
    cur_co.reset();    //手动让t_coroutine引用计数-1
    raw_ptr->yield();
}

uint64_t yCoroutine::GetCoroutineId()
{
    if (t_coroutine)
        return t_coroutine->getId();
    return 0;
}
// main test
void test_coroutine_func()
{
    std::cout << "test_coroutine_func begin." << std::endl;
    std::cout << "before test_coroutine_func yield." << std::endl;
    std::cout << "cur croutine id :" << yCoroutine::GetCoroutineId() << std::endl;
    yCoroutine::GetThis()->yield();
    std::cout << "after test_coroutine_func yield." << std::endl;
    //当前coroutine结束自动返回主协程
    std::cout << "test_coroutine_func end." << std::endl;
}

void test_thread_func()
{
    std::cout << "thread_func begin." << std::endl;
    //初始化当前线程主协程
    yCoroutine::GetThis();
    //在线程中创建一个新协程
    yCoroutine::ptr co(new yCoroutine(test_coroutine_func, 0, false));
    std::cout << "cur croutine id :" << yCoroutine::GetCoroutineId() << std::endl;
    std::cout << "before test_thread_func resume1." << std::endl;
    co->resume();
    std::cout << "after test_thread_func resume1." << std::endl;

    std::cout << "before test_thread_func resume2." << std::endl;
    co->resume();
    std::cout << "after test_thread_func resume2." << std::endl;

    std::cout << "thread_func end." << std::endl;
}

int main()
{
    std::thread th1(test_thread_func);
    th1.join();
    return 0;
}

运行结果:

thread_func begin.
yCoroutine::yCoroutine() id=1
cur croutine id :0
before test_thread_func resume1.
test_coroutine_func begin.
before test_coroutine_func yield.
cur croutine id :1
after test_thread_func resume1.
before test_thread_func resume2.
after test_coroutine_func yield.
test_coroutine_func end.
after test_thread_func resume2.
thread_func end.
yCoroutine::~yCoroutine id = 1
yCoroutine::~yCoroutine id = 0

参考:
https://github.com/sylar-yin/sylar
https://zh.wikipedia.org/wiki/%E5%8D%8F%E7%A8%8B
https://www.zhihu.com/question/20511233/answer/83307265
https://blog.csdn.net/qq910894904/article/details/41911175

相关文章