C++并发编程01-管理线程

用std::thread简单地创建并启动线程

简单来说,使用C++线程库启动一个线程,只需要构造一个std::thread对象的实例。

// 采用普通函数
void DoSomeWork();

// 启动一个线程执行DoSomeWork()函数
std::thread thread(DoSomeWork);
// 采用仿函数
class Task {
public:
    void operator()() const
    {
        // Do some work...
    }
};

// 启动一个线程执行task()函数
Task task;
std::thread thread1(task);

// 构造一个Task的临时对象并启动线程执行该临时对象的仿函数
// NB: 注意此处如果使用std::thread thread2(Task()),
//     部分编译器可能会报错,因为C++编译器会将其解析为函数
//     声明,而不是类型对象的定义(C++'s most vexing parse)。
std::thread thread2(Task{});
// 采用lumbda表达式
std::thread thread(
// 启动一个线程执行lumbda表达式内的内容
[]() {
    // Do some work...
});

join——等待线程汇入

即等待线程结束。最大粒度的线程间同步,当前线程在此时等待另一线程结束汇入后再往下执行。

void DoSomeWork();

// ... 假设当前为主线程,thread线程为该进程内除主线程外唯一的子线程
    std::thread thread(DoSomeWork);
// ...
    if (thread.joinable()) { // 判断当前thread实例持有线程
        thread.join(); // 主线程等待thread线程结束汇入
    }
// ...

如果不等待线程汇入,就必须要保证当前线程结束后,其他线程不会访问该线程中已经释放的资源(当前线程栈上的对象、当前线程执行的线程函数生命周期结束时栈展开会依次析构的对象等等)。这个问题不是多线程才有的问题,单线程代码中访问一个已经销毁的对象也会导致crash,只是多线程增加了这种问题发生的几率。

如果std::thread对象的实例持有线程,在析构销毁前既没有join又没有detach,程序就会异常终止(std::thread的析构中会调用std::terminate())。因此,即使有异常存在,也要保证线程能够正常汇入或分离【参考后文中的“thread_guard——使用RAII来保证线程正常工作”】。

detach——分离线程(后台执行)

线程detach后,线程的归属和控制将由C++运行库接管和处理。

void DoSomeWork();

// ... 假设当前为主线程,thread线程为该进程内除主线程外唯一的子线程
    std::thread thread(DoSomeWork);
// ...
    if (thread.joinable()) { // 判断当前thread实例持有线程
        thread.detach(); // 线程分离,当前thread实例不在持有该线程,
                         // 此时thread就可以正常析构了。
    }
// ...

需要注意的是,线程detach后如果主线程结束(即main函数return后,也即进程结束了)或任意线程调用exit终止了整个进程,该线程也会立即结束。

与std::thread中的线程传递参数

往新启动的线程传递参数(即线程函数的参数)只需要将这些参数作为 std::thread构造函数的附加参数即可,类似于std::bind传递参数的过程。

void DoSomeWork(int a, double b);

// 启动一个线程执行DoSomeWork(int a, double b)函数,并传递参数a=1,b=0.1
std::thread thread(DoSomeWork, 1, 0.1);

需要注意的是,这些参数会拷贝至新线程的内存空间中,即使函数中的参数是引用的形式,拷贝操作也会执行!这里的引用只能是形如const [数据类型] &的常量引用,比如const double&,非常量引用会导致编译出错。

double value = 0.1;
void DoSomeWork(int a, const double& b);

// 这里的常量1和变量value的值都会被拷贝一份至新线程的内存空间中,新线程在运行DoSomeWork函数时,b引用的是新线程内存空间中的值!
std::thread thread(DoSomeWork, 1, value);

我们可以用如下代码验证,打印出来的地址将是不同的两个:

#include <thread>
#include <iostream>

// 虽然传的是引用(常量引用),但是是新线程内存空间中的,而不是主线程中的那个value
void DoSomeWork(int a, const double& b)
{
    std::cout << &b << std::endl;
}

int main()
{
    double value = 0.1;
    std::cout << &value << std::endl; // 在主线程的value
    // 传给DoSomeWork的value将是拷贝一份的新线程内存空间中的值
    std::thread thread(DoSomeWork, 1, value);
    thread.join();
}

传参过程可能遇到的三种问题:

(1) 传递的参数是指向动态变量的指针

发生问题的地方在于:标准没有给出隐式转换的操作std::thread构造函数的拷贝操作的顺序的定义,各个编译器的实现可能有差异,有可能std::thread的构造函数拷贝的是转换前的变量,那么这个转换前的变量有可能在拷贝时失效了,如果这个变量是个指针,将会出现问题。

void DoSomeWork(std::string s);

void CreateDoSomeWorkThread()
{
    char buffer[32] = "...";
    std::thread thread(DoSomeWork, buffer); // here!
    thread.detach();
}

int main()
{
    CreateDoSomeWorkThread();
    //...
}

在代码中的“here!”处,我们传递的是buffer数组的指针,有的编译器实现可能在std::thread的构造函数拷贝转换前的变量buffer数组的指针而不是隐式转换后的std::string,当要进行隐式转换的时候,CreateDoSomeWorkThread()函数已经结束,buffer数组的生命周期已经结束,此时再进行隐式转换就会出现问题。

解决这个问题的方法就是直接显示地转换,这样转换始终会在std::thread构造函数的拷贝操作之前,我们手动确定了顺序。上面的代码可以改成如下安全的形式:

void DoSomeWork(std::string s);

void CreateDoSomeWorkThread()
{
    char buffer[32] = "...";
    // buffer → std::string(buffer):使用std::string进行显式转换,避免野指针
    std::thread thread(DoSomeWork, std::string(buffer));
    thread.detach();
}

int main()
{
    CreateDoSomeWorkThread();
    //...
}

(2) 传递的参数是非常量引用

本节的前面提到过“传递的参数会拷贝至新线程的内存空间中,即使函数中的参数是引用的形式,但是不能传非常量引用,否则会报编译错误”:

double value = 0.1;
void DoSomeWork(int a, double& b);
std::thread thread(DoSomeWork, 1, value); // 编译错误!

这是因为DoSomeWork第二个参数期待传入一个引用,但std::thread的构造函数并不知晓,构造函数会无视函数参数类型,盲目地拷贝已提供的变量,但内部代码会将拷贝的参数以右值的方式进行传递,这是为了支持那些仅支持移动不支持拷贝的类型,而后会尝试以右值为实参调用DoSomeWork,但因为函数期望的是一个非常量引用作为参数而非右值,此时就报了编译错误。这个问题和在使用std::bind时不能直接传递非常量引用是同样的问题。因此也有同样的解法,即使用std::ref或std::cref将参数转换成引用的形式,同时需要注意此时参数将不会拷贝至新线程的内存空间中,传递的是引用而非拷贝的副本!!

void DoSomeWork(int a, double& b)
{
    std::cout << &b << std::endl;
}

int main()
{
    double value = 0.1;
    std::cout << &value << std::endl; // 在主线程的value
    // 使用了std::ref传递引用,传给DoSomeWork的value将是主线程value的引用
    std::thread thread(DoSomeWork, 1, std::ref(value));
    thread.join();
}

我们将看到,打印出来的地址是相同的,都是主线程中的value的地址。

此时我们要注意引用对象(在这里是value)的生命周期问题,下面的这段代码就是有问题的:

void DoSomeWork(int a, double& b)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << b << std::endl;
}

int main()
{
    std::thread thread;
    {
        double value = 0.1;
        thread = std::thread(DoSomeWork, 1, std::ref(value));
    }
    thread.join();
}

value的生命周期已经结束了,但是运行DoSomeWork的线程还在使用它。但是如果我们运行这段代码,会发现程序没有崩溃,这是因为std::ref或std::cref返回的对象是std::reference_wrapper,而std::reference_wrapper本质存储的是指针!也就是说,我们在运行DoSomeWork的线程里用了一个野指针在操作,只是还没有发生踩内存而已。

(3) 传递的参数仅支持移动不支持拷贝

此时,当传递的实例是临时变量时,则自动进行移动操作,但当原对象是一个命名变量,转移的时候就需要使用std::move()进行显式移动。

void DoSomeWork(std::unique_ptr<int> a);

std::unique_ptr<int> p = std::make_unique<int>(1);

std::thread t1(DoSomeWork, std::move(p)); // 命名变量,使用std::move显式移动
std::thread t2(DoSomeWork, std::make_unique<int>(2)); // 临时变量,隐式移动

最后,线程函数是可以有返回值的,那么线程结束时也就有返回值,该返回值可以通过std::future获取,这将在第四篇总结文章中详细记录。详见:https://tis.ac.cn/blog/kongdeyou/cpp_concurrency_04

在std::thread间转移线程所有权

C++标准库中有很多资源占有(resource-owning)类型,如std::ifstream、std::unique_ptr、std::thread都是可移动但不可复制的(即NonCopyable)。

std::thread只可移动不可复制,我们只可以转移std::thread的所有权,这会导致旧的std::thread实例内的线程被转移到新的std::thread实例内。

void DoSomeWork();
std::thread t1(DoSomeWork);     // 创建一个新线程,t1持有该线程所有权
std::thread t2 = std::move(t1); // 移动t1给t2,t2持有该线程所有权,t1不再持有线程
t1 = std::thread(DoSomeWork);   // 再创建一个新线程(临时的std::thread变量),t1持有该线程所有权(临时变量的赋值操作符,走隐式移动)
std::thread t3;                 // 此时t3不持有任何线程的所有权
t3=std::move(t2);               // 移动t2给t3,t3持有该线程所有权,t2不再持有线程
t1=std::move(t3);               // 赋值操作将使程序崩溃,因为t1已经持有了线程【参考后文中的“joining_thread——完善地支持移动的情景”】
std::thread CreateThread1()
{
    void DoSomeWork();
    return std::thread(DoSomeWork);
}

std::thread CreateThread2()
{
    void DoSomeWork();
    std::thread t(DoSomeWork);
    return t;
}

// 编译器默认启动返回值优化,函数返回时如果返回的是值类型时会走移动构造。另外std::thread是NonCopyable的。
std::thread t1 = CreateThread1(); // t1持有CreateThread1中返回的线程,CreateThread1中的临时对象转移给了t1
std::thread t2 = CreateThread2(); // t2持有CreateThread2中返回的线程,CreateThread2中的t转移给了t2

扩展:NonCopyable基类可以实现如下:

class NonCopyable {
protected:
    NonCopyable() = default;
    ~NonCopyable() = default;
private:
    NonCopyable(const NonCopyable&) = delete;
    NonCopyable& operator=(const NonCopyable&) = delete;
};

某个类如果要实现为NonCopyable的,继承该类即可。

确定线程的数量

获取CPU核心数量:
unsigned int cores = std::thread::hardware_concurrency();
获取的该值仅是参考值,一般多核机器返回逻辑核心的数量。当无法获取时,返回0。

量化分析并合理设置工作线程数:
线程在执行的过程中,执行计算时需要占用CPU资源,等待时不会占用CPU资源,我们可以通过量化分析(比如带时间戳打日志进行统计、使用Visual Studio的性能分析工具分析等等),可以计算出工作线程运行过程中这两部分时间的比例(执行计算耗时 : 阻塞等待耗时)。
我们为了让CPU完全跑满,那么针对N核机器,通过单线程执行所有任务分析出执行计算的平均耗时为x、平均阻塞等待耗时为y,则工作线程数设置为N(x+y)/x时,能让CPU的利用率最大化。

线程标识

线程标识的数据类型:
std::thread::id

获取线程标识:
(1) 通过std::thread对象的成员函数get_id()获取,如果std::thread实例没有执行线程,返回std::thread::id的默认构造值;
(2) 通过当前线程调用std::this_thread::get_id()获取。

std::thread::id实例可以自由拷贝和对比(已重载了“==”运算符),因为只是一个用于标识某个线程的id值。同时标准库提供了std::hash\<std::thread::id>容器,可以作为键值使用。

thread_guard——使用RAII来保证线程正常工作

class thread_guard {
public:
    explicit thread_guard(std::thread& thread) : t(thread) {}
    virtual ~thread_guard()
    {
        if (t.joinable()) {
            t.join();
        }
    }

    thread_guard(thread_guard const&) = delete;
    thread_guard& operator=(thread_guard const&) = delete;

private:
    std::thread& t;
};

我们可以这样使用:

void DoSomeWork()

int main()
{
    std::thread thread(DoSomeWork);
    thread_guard guard(thread);
    system("pause");
}

thread_guard的线程所有权还是在thread实例,没有传递到thread_guard的实例guard里,thread_guard只是存储了std::thread的引用,这在使用的时候有可能会带来其他的麻烦:比如thread实例被移动走了,开发者期望移动到其他地方保留下来后台运行这个线程,但是thread_guard析构的时候join了,当前线程会强制等待该线程执行结束,这导致了开发不期望的结果。

scoped_thread——移动线程所有权到scoped_thread中来

class scoped_thread {
public:
    explicit scoped_thread(std::thread& thread)
        : t(std::move(thread)) // 移动线程所有权到scoped_thread内的t
    {
        if (!t.joinable()) {
            throw std::logic_error("empty thread");
        }
    }
    virtual ~scoped_thread()
    {
        t.join();
    }

    scoped_thread(scoped_thread const&) = delete;
    scoped_thread& operator=(scoped_thread const&) = delete;

private:
    std::thread t;
};

scoped_thread能不完全解决上述的问题,我们把线程所有权转移到scoped_thread中,这样thread中将不持有任何线程。但是由于没有重载operator==实现移动操作,我们还不能直接移动scoped_thread来实现开发者期望的移动到其他地方保留下来后台运行这个线程。这将在joining_thread中完全解决。

joining_thread——完善地支持移动的情景

我们在“在std::thread间转移线程所有权”这一节中提到了重复赋值导致程序崩溃的问题,我们在前一节实现的scoped_thread中并没有将该问题解决,这一节中将解决这个问题。

事实上这也是C++17标准给出的一个建议,但是C++委员会成员没有达成一致,因此joining_thread类没有加入到C++17中,但C++20仍然对此进行了探讨,并实现了一个std::jthread(然鹅仍然还是没有进到标准中::T_T::)。

class joining_thread {
public:
    joining_thread() noexcept = default;

    template<typename Func, typename ...Args>
    explicit joining_thread(Func&& func, Args&& ...args) :
        t(std::forward<Func>(func), std::forward<Args>(args)...) {}

    explicit joining_thread(std::thread thread) noexcept :
        t(std::move(thread)) {}

    joining_thread(joining_thread&& other) noexcept :
        t(std::move(other.t)) {}

    joining_thread& operator=(joining_thread&& other) noexcept
    {
        // 当其他joining_thread实例想移动进来,而当前实例内的仍持有线程时
        if (joinable()) {
            join(); // 会join当前线程
        }
        t = std::move(other.t); // 再移动进来
        return *this;
    }

    joining_thread& operator=(std::thread other) noexcept
    {
        // 当其他joining_thread实例想移动进来,而当前实例内的仍持有线程时
        if (joinable()) {
            join(); // 会join当前线程
        }
        t = std::move(other); // 再移动进来
        return *this;
    }

    ~joining_thread() noexcept
    {
        if (joinable()) {
            join(); // 析构时保证仍持有线程时一定会走join
        }
    }

    void swap(joining_thread& other) noexcept
    {
        t.swap(other.t);
    }

    std::thread::id get_id() const noexcept
    {
        return t.get_id();
    }

    bool joinable() const noexcept
    {
        return t.joinable();
    }

    void join()
    {
        t.join();
    }

    void detach()
    {
        t.detach();
    }

    std::thread& as_thread() noexcept
    {
        return t;
    }

    const std::thread& as_thread() const noexcept
    {
        return t;
    }

private:
    std::thread t;
};

最后,我们来复盘看看thread_guard、scoped_thread、joining_thread的区别:

thread_guard只是简单地将一个std::thread的实例包装进来做RAII管理,线程的所有权还在std::thread实例中;

scoped_thread会将std::thread的实例中的线程移动到scoped_thread的肚子里面来管理,但是scoped_thread不能移动出去,只能在自己生命周期结束时走析构的时候join线程;

joining_thread实现了完善的线程包装器,用它的构造函数能够创建线程,并且在自己的生命周期结束时,如果线程没有join或detach过,自动地join线程保证正常使用线程,同时实现了移动构造和移动赋值,支持移动操作,当其他joining_thread实例想移动进来,而当前实例内的仍持有线程时,会join当前线程,再移动进来。


本文为原创内容,遵循CC BY-ND 4.0协议,署名-禁止演绎。
转载请注明出处:https://tis.ac.cn/blog/kongdeyou/cpp_concurrency_01/

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注