C++11新特性——启动线程

 
 

线程支持库

  C++ 11 中新增了支持线程(thread )、互斥(mutual exclusion)、条件变量(condition variables)和 std::future(期货)。
  本节是 C++ 11 并发的第一节,主要介绍多线程运行时的基本高级接口——std::async() 和 std::future 以及底层接口 std::thread 和 std::promise。
 

并行编程

  常见的并行编程有多重模型,比如共享内存,多线程,消息传递等。从实用性上讲,多线程模型往往具有较大的优势。多线程模型允许同一时间有多个处理器单元执行统一进程中的代码部分,而通过分离的栈空间和共享的数据区及堆空间,线程可以拥有独立的执行状态,还可以进行快速的数据共享。
  C++ 11 之前多线程编程都需要系统的支持,在不同的系统下创建线程需要不同的 API 如 pthread_create(),Createthread(),beginthread() 等,使用起来都比较复杂。而 C++11 提供了新头文件 <thread>、<mutex>、<atomic>、<future> 等用于支持多线程。
  在 C++ 11 中,标准的一个相当大的变化就是引入了多线程的支持。这使得 C++ 在进行线程编程的时候,不必依赖第三方库和标准。
 

高级接口

  std::async() 函数是更高层次上的多线程操作(相对于 std::thread),使我们不用关注线程创建内部细节,就能方便的获取异步执行状态和结果,还可以指定线程创建策略。新标准鼓励用 std::async() 替代线程的创建,让它作为异步操作的首选。
  通常来说,对初学者最友好的都是那些封装得最好、最高级的接口。而 std::async() 和 std::future 构成的多线程高级接口就是入门的最好开始,官方文档请见 std::futurestd::async()


  • std::async() 是一个辅助函数,提供了一个接口,可以给予它一个可调用对象(callable object)比如函数,成员函数、函数对象或者 lambda。它就会尝试自动在后台运行,称为一个独立的线程。如果底层平台不支持多线程,也不会有任何损失。
  • std::functure 则用来接收这个独立线程的返回结果,它可能是正确的返回值,也可能是一个异常。std::future 中增加许多对返回结果的操作和判断,而且只允许结果被获取一次。
     

调用高级接口

  如果需要计算两个返回值操作数的和,寻常做法如下所示。但是无论调用顺序是什么(调用次序无法确定),需要的时间是调用 func1() 和 func2() 的时间再加上求和的时间。

func1() + func2()

  但是,有了 std::async() 就可以做得更好。

int DoSomething(int i) { ... }
int func1(int i) { return DoSomething(i + 1); }
int func2(int j) { return DoSomething(j + 2); }
int main() {
    std::future<int> result1 = std::async(func1, 1); // 异步调用 func1
    int result2 = func2(2); // 因为后面马上要获取结果,func2() 调用一个新线程没有太大意义
    int result = result1.get() + result2; // get() 函数获取 std::future 的返回值
}

  在这里,std::async() 尝试将所获得的函数立刻异步启动与另一个线程中。需要注意的是,不论原函数是否具有返回值,也会返回一个 std::future 对象。

  • std::async() 可能会返回异常,需要有句柄来承载它。而对于返回值为空的函数,也会返回具化版本 std::future\
  • 返回的 std::future 对象还有一个非常重要的功能。它可以表示程序运行的一个节点,即当主程序调用 std::future 对象的 wait() 或者 get() 成员时必须保证线程已经完成运算,否则会一直等待下去,因为有很多线程使用的是延迟运行的策略。

  接下来是处理总和,就需要调用 get() 成员函数来获得 result1 的返回值。随着调用 get() 函数,以下三件事之一会发生:

  • std::async() 成功启动并运行结束,立刻返回结果。
  • std::async() 成功启动但是尚未结束,get() 会引起主程序阻塞等待结果。
  • std::async() 尚未启动,就会强制在主线程启动,但此时效果与同步调用无异。

  以上三种情况,特别是第三种,保证了在单线程环境或者已经无法再开辟新线程的时候可以有效运行。如果当前有一个线程处于可用状态,那么它启动新线程;否则,这一个调用会延迟到明确需要结果(调用 get())或者明确要求线程运行(调用 wait())。
  事实上,std::async() 启动只有两个可能:调用时立即建立新线程,否则直到明确表示需要结果才会运行。不会出现发现线程池有空缺了立即补上的情况。
  为了获得最佳效果,尽量在调用 std::async() 和调用 get() 之间的距离最大化。如果主程序后面完全没有调用 get() 或者 wait(),那么线程也可能直接永远不启动。
 

Launch(发射)策略

  可以使用 std::launch::async 枚举强迫 std::async() 绝不拖延开辟新线程:

std::future<int> result1 = std::async(std::launch::async, func1, 1);

  如果异步调用在此处无法实现(单线程环境或者开辟线程失败),就会抛出 std::system_error 异常。而在声明这个 launch 策略后,后面不调用 get() 或者 wait(),程序也一定以执行完成再返回,即使主程序已经运行完毕,也会等待这个线程结束才会关闭应用程序;另外,如果这里没有声明 std::future 对象获取 std::async() 返回值,主程序会直接阻塞直到此线程运行完成,就相当于一个同步调用再加上切换线程的开销。
  也可以使用 std::lauch::deffered 来声明线程延缓执行。

std::future<int> result1 = std::async(std::launch::deffered, func1, 1);

  这样,程序绝不会在调用 get() 或者 wait() 之前运行,其实 std::lauch::deffered 与串行调用没有不同,在很多时候可以节约不必要的浪费。如下所示,就不需要两个值都计算一遍。

auto f1 = std::async(std::launch::deffered, task1);
auto f2 = std::async(std::launch::deffered, task2);
... 
auto val = RunF1() ? f1.get() : f2.get();

 

等待和轮询

  除了调用 get() 或者 wait() 可以马上获得结果,有时需要一种东西可以帮助鉴别线程任务是否完成,如果完成,则可以获取结果,如果没有完成,主线程就继续执行其他任务或者休眠让出 CPU。有两个函数可以给线程运行设定一个时间然后查询是否运行完成,但这都是阻塞主线程换来的。

  • wait_for() 并传入一个时间段,就会在阻塞主线程一段时间然后获取 std::future 返回状态
  • wait_until() 并并传入一个时间点,主线程会阻塞到改时间点并返回状态。如果设定的时间点在现在之前,就会立刻返回查询结果。
int main()
{
    std::future<int> future = std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::seconds(3));
        return 8;
    });

    std::cout << "waiting...\n";
    std::future_status status;
    do {
        status = future.wait_for(std::chrono::seconds(1));
        if (status == std::future_status::deferred) {
            std::cout << "deferred\n";
        }
        else if (status == std::future_status::timeout) {
            std::cout << "timeout\n";
        }
        else if (status == std::future_status::ready) {
            std::cout << "ready!\n";
        }
    } while (status != std::future_status::ready);

    std::cout << "result is " << future.get() << '\n';
}
// 输出为
//waiting...
//timeout
//timeout
//ready!
//result is 8

 

shared future

  在上面提到的,std::future 提供了得到未来结果的能力,但是,std::future 的结果只能处理一次。第二次调用 get() 会导致未定义行为。
  有的时候,需要多次处理未来的结果。所以,C++ 提供了 std::shared_future,从而可以多次调用 get(),多次得到同样的结果或者同样的异常。比如下面的粒子:

int GetNumber() {
    int i = 0;
    std::cin >> i;
    if (!std::cin)
        throw std::runtime_error("no number read");
    return i;
}
void DoSomething(char c, std::shared_future<int> f) {
    try {
        int num = f.get();
        for (int i = 0; i < num; ++i) {
        // 每 100 毫秒输出一个字符,输出 num 次
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::cout.put(c).flush();
        }
    }
    catch (const std::exception &e) {
        std::cerr << std::this_thread::get_id() << ": " << e.what() << std::endl;
    }
}

int main() {
    try {
        std::shared_future<int> f = std::async(GetNumber);
        // 由于我们构造 f 的时候并不知道开启下列线程的值是什么,所以可以构造成 std::shared_future 分发给不同的线程。
        auto f1 = std::async(std::launch::async, DoSomething, '.', f);
        auto f2 = std::async(std::launch::async, DoSomething, '+', f);
        auto f3 = std::async(std::launch::async, DoSomething, '*', f);

        f1.get();
        f2.get();
        f3.get();
    }
    catch (const std::exception &e) {
        std::cout << e.what() << std::endl;
    }
    std::cout << std::endl;
}

  在上面如果输入 5,可能输出:

+.+..+.+.+ *

  如果输入 x,可能输出,且 ID 和次序无法预料:


5016: no number read
16184: no number read
18812: no number read

 

底层接口

  除了高级接口 async() 和 (shared)future,C++ 标准库还提供了一个启动及处理线程的底层接口。
 

std::thread

  构造一个 std::thread 对象可以传入任何 callable object(function、member function、function object、lambda),还可以夹带任何可能的实参。这是相对 std::async() 更底层的接口,相比高级接口:

  • std::thread 没有发射策略,C++ 永远尝试新建一个线程,无法做到则抛出一个异常。
  • 不能获得线程的返回值,唯一可以获得的是一个独一无二的线程 ID。
  • 发生异常,但是未被捕获的话,该线程会调用 std::terminate()。如果要将异常抛出线程外,必须使用 std::exception_ptr。
  • 如果在 thread 程序结束运行前发生了 thread object 析构或者 move assignment,该线程会马上调用 std::terminate() 终止程序。避免这种情况有两种做法,在析构前调用 join(),阻塞直到线程运行结束,或者调用 detach() 将线程转入后台不受任何限制。
  • 如果 main() 函数结束了,所有运行于后台的线程都会强行终止。

  下面是一段示例代码:

void DoSomething(int num, char c) {
    for(int i = 0; i < num; ++i) {
        std::this_thread::sleep_for(chrono::milliseconds(100));
        std::cout.put(c).flush();
    }
}
int main() {
    std::thread t1(DoSomething, 5, ',');

    for(int i = 0; i < 5; ++i) {
        thread t(DoSomething, 100, 'a' + i);
        t.detach();
    }

    cin.get();
    t1.join();
}

  上面的代码中,第一个线程输出 5 个逗号,然后构建 5 个线程,马上 detach() 运行与后台。等待控制台输入一个字符,就马上退出程序,预计中后台程序会马上停止输出。不计算输入的控制字符,得到的字符串为:

ba,cde,baced,bcaedb,eadcbae,

  而如果将 t.detach() 注释掉,由于 thread object 还没有结束就被析构掉了,线程会主动调用 std::terminate() 结束整个应用程序。注意,线程 detach() 是危险的举动,尽可能自己保存 thread object 达到控制的目的。
 

std::promise

  如果要在线程之间传递参数和处理异常,就必须要用到 std::promise。其实 std::async() 就是使用的 std::promise 来返回 std::future 对象。std::promise 和 std::future 对象一样都可以暂时持有一个对象。但是 std::future 是用 get() 来取回对象,std::promise 是用 set_…() 来设置对象。如下所示:

void DoSomething(std::promise<std::string> &p) {
    try {
        char c;
        std::cin >> c;
        if (c == 'x')
            throw std::runtime_error(std::string("char ") + c);
        std::string s = std::string("char ") + c;
        p.set_value(std::move(s));
    }
    catch (...) {
        p.set_exception(std::current_exception());
    }
}

int main() {
    try {
        std::promise<std::string> p;
        std::thread t(DoSomething, p);
        t.detach();

        std::future<std::string> f(p.get_future());

        std::cout << "result " << f.get() << std::endl;
    }
    catch (const std::exception& e) {
        std::cerr << "exception " << e.what() << std::endl;
    }
}

  在这里,必须对线程内传递引用或者指针,否则是无法取出结果的。然后在线程内调用 set_value() 或者 set_exception() 在 std::promise 中存储对象。一旦存有了值或者异常,状态就会转变为 ready,必须将 std::promise 转化为 std::future 才能获取存储的值。而 std::future 变量的 get() 仍是线程的节点,如果线程没有运行完,主线程会阻塞直到状态为 ready。另外需要注意,一旦状态为 ready,线程就会退出。如果线程中有需要释放的变量,比如堆变量,就应该调用 set_value_at_thread_exit() 或者 set_exception_at_thread_exit() 来让线程释放资源。
 

补充:

  其实还有一个高级接口, std::packaged_task。std::async() 是一旦开启马上运行于后台,但有的时候就是希望某些线程过一会儿再建立(建立线程池),那么就可以使用 std::packaged_task。
  原本可以这么写:

double compute(int x, int y);
std::future<double> f = std::async(compute, 1, 2);
...
double res = f.get();

  现在可以这么写:

double compute(int x, int y);
std::packaged_task<double(int, int)> task(compute);
std::future<double> f = task.get_future();
...
task(7, 5);
...
double res = f.get();

 
转载请带上本文永久固定链接:http://www.gleam.graphics/multithread.html

About the Author

发表评论

电子邮件地址不会被公开。

Bitnami