本文共 4942 字,大约阅读时间需要 16 分钟。
Pipelining是一个通用并行模型,模仿一个传统的制造集成生成线,数据流向一系列的管道滤波,每个滤波以某种方式处理数据,给定一个输入流数据,一些滤波能并行操作,另外一些不能。例如,视频处理,对帧的处理不依赖其他帧,那么能同时处理多个帧,反之,对帧的操作需要首先处理先前的帧。
TBB类pipeline和filter实现pipeline模式。一个简单的文本处理例子会被使用证明pipeline和filter的用法。这个例子读取一个文本文件,平方这个文本里的每个数,写出改变的文本到一个新的文件里,流程如下:
这个滤波作用是提取文本中一行的数据:
class InputStream: public filter {public: explicit InputStream(ifstream* file):m_file(file), filter(serial_in_order) {} // filter初始化serial_in_order(串行) ~InputStream() {} void* operator()(void*) { ostringstream out; vectorvec_str; string str; if(!getline(*m_file, str)) { return NULL; } _split(str, ' ', vec_str); // 把开辟内存,准备存放固定格式的数据 int length = vec_str.size(); char* result = new char[length*sizeof(int) + sizeof(int)]; vector ::iterator _it = vec_str.begin(); // 把数据转成整形,存成固定格式 for (int i = 0; _it != vec_str.end(); ++_it, i++) { int temp = atoi((*_it).c_str()); memcpy(result + sizeof(int) + i*sizeof(int), &temp, sizeof(int)); } memcpy(result, &length, sizeof(int)); return result; }private: ifstream* m_file;};
这个滤波为对获取的数据进行平方操作:
class Transform: public filter {public: Transform():filter(parallel) {} // filter初始化parallel(并行) ~Transform() {} void* operator()(void* in_str) { if (!in_str) { return NULL; } // 获取数据长度,得到数据的首地址result char* char_str = reinterpret_cast(in_str); ostringstream out; int* length = reinterpret_cast (char_str); char* result = new char[(*length)*sizeof(int) + sizeof(int)]; // 遍历获取数据,并对数据进行平方 for (int i = 0; i < *length; i++) { char* temp = (char_str + sizeof(int) + i*sizeof(int)); int temp_int = (*(reinterpret_cast (temp)))*2; // 把数据存成固定格式,准备传出去 memcpy(result + sizeof(int) + i*sizeof(int), &temp_int, sizeof(int)); } memcpy(result, length, sizeof(int)); return result; }};
这个滤波的作用是输出数据到文件中:
class OutputStream: public filter {public: explicit OutputStream(ofstream* file):m_file(file), filter(serial_in_order) {} ~OutputStream() {} void* operator()(void* in_str) { if (!in_str) { return NULL; } char delim = ' '; char cvtline = '\n'; // 获取数据长度length char* char_str = reinterpret_cast(in_str); int* length = reinterpret_cast (char_str); // 把数据存到文件中 for (int i = 0; i < *length; i++) { int* int_str = reinterpret_cast (char_str + sizeof(int) + i*sizeof(int)); (*m_file) << *int_str; (*m_file) << " "; } (*m_file) << "\n"; if (char_str) { delete []char_str; char_str = NULL; } return NULL; }private: ofstream* m_file;};
运行管道:
int run_pipeline(int nthreads) { string input = "/home/shaomingliang/beh/core/tbb/pipline/input.txt"; string output = "/home/shaomingliang/beh/core/tbb/pipline/output.txt"; ifstream in_stream(input.c_str()); ofstream out_stream(output.c_str()); if (!in_stream.is_open()) { throw invalid_argument(("invalid input file name: " + input).c_str()); } if (!out_stream.is_open()) { throw invalid_argument(("invalid output file name: " + output).c_str()); } // 建立管道,加入InputStream滤波 pipeline pipeline; InputStream input_filter(&in_stream); pipeline.add_filter(input_filter); // 加入Transform滤波 Transform trans_filter; pipeline.add_filter(trans_filter); // 加入OutputStream滤波 OutputStream out_filter(&out_stream); pipeline.add_filter(out_filter); // 开始计时并运行pipeline tick_count start = tick_count::now(); pipeline.run(nthreads * 4); ostringstream out; out << "thread" << nthreads <<" consume time: " << (tick_count::now() - start).seconds() << endl; in_stream.close(); out_stream.close(); return 1;}
main函数:
int main(int argc, char** args) { try { // 获取可利用的线程数 int p = task_scheduler_init::default_num_threads(); tick_count global_start = tick_count::now(); for (int i = 1; i <= p; i++) { // 初始化task scheduler并初始化线程数为i task_scheduler_init init(i); if (!run_pipeline(i)) { return 0; } } ostringstream out; out << "global consume time: " << (tick_count::now() - global_start).seconds() << endl; } catch(exception& e) { cerr << "error ocurred. error text is:\"" << e.what() << "\"" << endl; } return 0;}
字符串分割函数:
void _split(const string& s, char delim,vector& v) { int i = 0; size_t pos = s.find(delim); while (pos != string::npos) { v.push_back(s.substr(i, pos-i)); i = ++pos; pos = s.find(delim, pos); if (pos == string::npos) v.push_back(s.substr(i, s.length())); }}