最新的 C++ 迭代(称为 C++11,在去年通过了国际标准化组织 (ISO) 的审批)形式化了一组新库和一些保留字以处理并发。许多开发者以前都在 C++ 中使用过并发功能,但都是通过第三方的库,即,通常直接公开 OS API。
Herb Sutter 在 2004 年 12 月宣告“免费的性能午餐”结束,因为禁止 CPU 制造商通过物理能耗和增加碳排放量来生产更快的 CPU。由此进入了当前主流的多核时代,一种新的实现,而 C++(标准组件)为适应此类变化取得了重要的飞跃。
本文下面的内容将分成两节,另外还有一些小节。第一节,从并行执行开始,介绍允许应用程序并行运行独立或半独立活动的技术。第二节,从同步并发执行开始,探讨同步机制,这些活动通过同步方式处理数据,以避免出现争用情况。
本文基于即将推出的 Visual C++ 版本(现在称为 Visual C++ 11)中包括的功能。当前版本 (Visual C++ 2010) 中已提供其中部分功能。尽管本文不提供关于为并行算法建模的指南,也不提供关于所有可用选项的详尽文档,但却全面介绍了新的 C++11 并发功能,内容丰富详实。
并行执行
当您对数据建模和设计算法时,很自然地就会按照具有一定顺序的步骤指定这些建模和设计过程。只要性能位于可接受的范围内,这就是最值得推荐的方案,因为它通常更易于理解,而这符合维护代码的要求。
当性能成为令人担忧的问题时,为处理这种情况通常都会先尝试优化序列算法以减少使用的 CPU 循环。这种做法始终可行,直到无法再进行优化或难以优化。这时就需要将连续的一系列步骤拆分为同时进行的多项活动。
在第一节中,您将了解到以下内容:
- 异步任务: 一小部分原始算法,仅通过它们生成或使用的数据进行链接。
- 线程: 运行时环境管理的执行单元。它们与任务相关,因为任务在某种程度上在线程上运行。
- 线程内部: 线程绑定变量、线程传播的异常等等
异步任务
在本文随附的代码中,您将找到一个名为“顺序案列”的项目,如图 1 所示。
图 1 顺序案例代码
int a, b, c;
int calculateA()
{
return a+a*b;
}
int calculateB()
{
return a*(a+a*(a+1));
}
int calculateC()
{
return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
getUserData(); // initializes a and b
c = calculateA() * (calculateB() + calculateC());
showResult();
}
主函数向用户请求一些数据,然后将该数据提交给三个函数: calculateA、calculateB 和 calculateC。稍后将组合这些结果,以便为用户生成一些输出信息。
随附材料中计算函数的编码方式在每个函数中引入了 1 到 3 秒的随机延迟。由于这些步骤是按顺序执行的,因此只要输入数据,就会产生一个在最糟糕情况下为 9 秒的总体执行时间。您可以通过按 F5 运行本示例来尝试此代码。
因此,我需要修改执行序列和查找并发执行步骤。由于这些函数都是独立的,因此我可以使用异步函数并行执行它们:
int main(int argc, char *argv[])
{
getUserData();
future<int> f1 = async(calculateB), f2 = async(calculateC);
c = (calculateA() + f1.get()) * f2.get();
showResult();
}
在这里我引入了两个概念: async 和 future,在 标头和 std 命名空间中都有定义。前者接收函数、lambda 或函数对象(即算符)并返回 future。您可以将 future 的概念理解为事件结果的占位符。什么结果?异步调用函数返回的结果。
在某些时候,我将需要这些并行运行函数的结果。对每个 future 调用 get 方法会阻止执行,直到值可用。
您可以通过运行随附示例中的 AsyncTasks 项目来测试修改后的代码,并将其与顺序案例进行比较。经过此修改后最糟情况下的延迟大约为 3 秒,与顺序版本的 9 秒相比有很大进步。
此轻型编程模型将开发者从创建线程的任务中解放出来。然而,您可以指定线程策略,但这里我不介绍此内容。
线程
前面一节介绍的异步任务模型在某些指定的应用场景中可能已经足够了,但如果您需要进一步处理和控制线程的执行,那么 C++11 还提供了线程类,该类在 标头中声明并位于 std 命名空间中。
尽管编程模型更为复杂,但线程可以提供更好的同步和协调方法,以允许它们执行其他线程并等待既定的时间长度,或直到其他线程完成后再继续。
在以下随附代码的“线程”项目中提供的示例中,我让 lambda 函数(为其赋予了整数参数)将其小于 100,000 的倍数显示到控制台:
auto multiple_finder = [](int n) {
for (int i = 0; i < 100000; i++)
if (i%n==0)
cout << i << " is a multiple of " << n << endl;
};
int main(int argc, char *argv[])
{
thread th(multiple_finder, 23456);
multiple_finder(34567);
th.join();
}
正如您将在后面的示例中看到的,我视情况将 lambda 传递给线程;一个函数或算符就已足够。
在主函数中,我使用不同的参数在两个线程中运行此函数。看一下生成的结果(因为运行时机不同,运行产生的结果也不同):
0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456
我可以使用线程实现前面一节中有关异步任务的示例。为此,我需要引入 promise 的概念。可以将 promise 理解为一个用于放置可用结果的接收器。将结果放置在其中后又从哪个位置提取该结果呢?每个 promise 都有一个关联的 future。
图 2中显示的、示例代码的 Promise 项目中提供的代码将三个线程(而非任务)与 promise 关联并让每个线程调用 calculate 函数。将这些细节与轻型 AsyncTasks 版本比较。
图 2 关联 Future 和 Promise
typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
p.set_value(f());
}
int main(int argc, char *argv[])
{
getUserData();
promise<int> p1, p2;
future<int> f1 = p1.get_future(), f2 = p2.get_future();
thread t1(&func2promise, calculateB, std::ref(p1)),
t2(&func2promise, calculateC, std::ref(p2));
c = (calculateA() + f1.get()) * f2.get();
t1.join(); t2.join();
showResult();
}
线程绑定变量和异常
在 C++ 中,您可以定义全局变量,它的范围绑定到整个应用程序,包括线程。但相对于线程,现在有方法定义这些全局变量,以便每个线程保有自己的副本。此概念称为线程本地存储,其声明如下:
thread_local int subtotal = 0;
如果声明在函数范围内完成,则只有该函数能够看到变量,但每个线程将继续维护自己的静态副本。也就是说,每个线程的变量的值在函数调用之间将得到保持。
尽管 thread_local 在 Visual C++ 11 中不可用,但可以使用非标准的 Microsoft 扩展对它进行模拟:
#define thread_local __declspec(thread)
如果线程内引发异常将会发生什么?有时候可以在线程内的调用堆栈中捕获和处理异常。但如果线程不处理异常,则需要采用一种方法将异常传输到发起方线程。C++11 引入了此类机制。
在图 3 中,在随附代码的项目 ThreadInternals 中提供了一个 sum_until_element_with_threshold 函数,该函数遍历矢量直至找到特定元素,而在此过程中它会对所有元素求和。如果总和超过阈值,则引发异常。
图 3 线程本地存储和线程异常
thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
unsigned threshold, exception_ptr& pExc)
{
try{
find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
bool ret = (i!=element);
sum_total+= i;
if (sum_total>threshold)
throw runtime_error("Sum exceeded threshold.");
return ret;
});
cout << "(Thread #" << this_thread::get_id() << ") " <<
"Sum of elements until " << element << " is found: " << sum_total << endl;
} catch (...) {
pExc = current_exception();
}
}
如果发生该情况,将通过 current_exception 将异常捕获到 exception_ptr 中。
主函数对 sum_until_element_with_threshold 触发线程,同时使用其他参数调用该相同的函数。当两个调用(一个在主线程中,另一个在从主线程触发的线程中)都完成后,将对其相应的 exception_ptrs 进行分析:
const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
exception_ptr pExc1, pExc2;
scramble_vector(1000);
thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
th.join();
dealWithExceptionIfAny(pExc1);
dealWithExceptionIfAny(pExc2);
}
如果其中任何 exception_ptrs 进行了初始化(即,表明出现某些异常),将使用 rethrow_exception 再次触发它们的异常:
void dealWithExceptionIfAny(exception_ptr pExc)
{
try
{
if (!(pExc==exception_ptr()))
rethrow_exception(pExc);
} catch (const exception& exc) {
cout << "(Main thread) Exception received from thread: " <<
exc.what() << endl;
}
}
当第二个线程中的总和超过其阈值时,我们将获得以下执行结果:
(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.
同步并发执行
最好能够将所有应用程序拆分为 100% 独立的异步任务组。但实际上这几乎是不可能的,因为各方并发处理的数据都至少具有一定的依赖关系。本节介绍可避免发生争用情况的新 C++11 技术。
您将了解到以下信息:
- 原子类型: 与基元数据类型相似,但允许进行线程安全修改。
- 互斥和锁定: 允许我们定义线程安全临界区的元素。
- 条件变量: 一种在满足某条件之前停止执行线程的方法。