博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
TBB之pipeline
阅读量:4058 次
发布时间:2019-05-25

本文共 4942 字,大约阅读时间需要 16 分钟。

Pipelining是一个通用并行模型,模仿一个传统的制造集成生成线,数据流向一系列的管道滤波,每个滤波以某种方式处理数据,给定一个输入流数据,一些滤波能并行操作,另外一些不能。例如,视频处理,对帧的处理不依赖其他帧,那么能同时处理多个帧,反之,对帧的操作需要首先处理先前的帧。

TBB类pipeline和filter实现pipeline模式。一个简单的文本处理例子会被使用证明pipeline和filter的用法。这个例子读取一个文本文件,平方这个文本里的每个数,写出改变的文本到一个新的文件里,流程如下:

这里写图片描述

这个滤波作用是提取文本中一行的数据:

class InputStream: public filter {public:    explicit InputStream(ifstream* file):m_file(file), filter(serial_in_order) {} // filter初始化serial_in_order(串行)    ~InputStream() {}    void* operator()(void*) {        ostringstream out;        vector
vec_str; string str; if(!getline(*m_file, str)) { return NULL; } _split(str, ' ', vec_str); // 把开辟内存,准备存放固定格式的数据 int length = vec_str.size(); char* result = new char[length*sizeof(int) + sizeof(int)]; vector
::iterator _it = vec_str.begin(); // 把数据转成整形,存成固定格式 for (int i = 0; _it != vec_str.end(); ++_it, i++) { int temp = atoi((*_it).c_str()); memcpy(result + sizeof(int) + i*sizeof(int), &temp, sizeof(int)); } memcpy(result, &length, sizeof(int)); return result; }private: ifstream* m_file;};

这个滤波为对获取的数据进行平方操作:

class Transform: public filter {public:    Transform():filter(parallel) {} // filter初始化parallel(并行)    ~Transform() {}    void* operator()(void* in_str) {        if (!in_str) {            return NULL;        }        // 获取数据长度,得到数据的首地址result        char* char_str = reinterpret_cast
(in_str); ostringstream out; int* length = reinterpret_cast
(char_str); char* result = new char[(*length)*sizeof(int) + sizeof(int)]; // 遍历获取数据,并对数据进行平方 for (int i = 0; i < *length; i++) { char* temp = (char_str + sizeof(int) + i*sizeof(int)); int temp_int = (*(reinterpret_cast
(temp)))*2; // 把数据存成固定格式,准备传出去 memcpy(result + sizeof(int) + i*sizeof(int), &temp_int, sizeof(int)); } memcpy(result, length, sizeof(int)); return result; }};

这个滤波的作用是输出数据到文件中:

class OutputStream: public filter {public:    explicit OutputStream(ofstream* file):m_file(file), filter(serial_in_order) {}    ~OutputStream() {}    void* operator()(void* in_str) {        if (!in_str) {            return NULL;        }        char delim = ' ';        char cvtline = '\n';        // 获取数据长度length        char* char_str = reinterpret_cast
(in_str); int* length = reinterpret_cast
(char_str); // 把数据存到文件中 for (int i = 0; i < *length; i++) { int* int_str = reinterpret_cast
(char_str + sizeof(int) + i*sizeof(int)); (*m_file) << *int_str; (*m_file) << " "; } (*m_file) << "\n"; if (char_str) { delete []char_str; char_str = NULL; } return NULL; }private: ofstream* m_file;};

运行管道:

int run_pipeline(int nthreads) {    string input = "/home/shaomingliang/beh/core/tbb/pipline/input.txt";    string output = "/home/shaomingliang/beh/core/tbb/pipline/output.txt";    ifstream in_stream(input.c_str());    ofstream out_stream(output.c_str());    if (!in_stream.is_open()) {        throw invalid_argument(("invalid input file name: " + input).c_str());    }    if (!out_stream.is_open()) {        throw invalid_argument(("invalid output file name: " + output).c_str());    }    // 建立管道,加入InputStream滤波    pipeline pipeline;      InputStream input_filter(&in_stream);    pipeline.add_filter(input_filter);    // 加入Transform滤波    Transform trans_filter;    pipeline.add_filter(trans_filter);    // 加入OutputStream滤波    OutputStream out_filter(&out_stream);    pipeline.add_filter(out_filter);    // 开始计时并运行pipeline    tick_count start = tick_count::now();    pipeline.run(nthreads * 4);    ostringstream out;    out << "thread" << nthreads <<" consume time: " << (tick_count::now() - start).seconds() << endl;    in_stream.close();    out_stream.close();    return 1;}

main函数:

int main(int argc, char** args) {    try {        // 获取可利用的线程数        int p = task_scheduler_init::default_num_threads();        tick_count global_start = tick_count::now();        for (int i = 1; i <= p; i++) {            // 初始化task scheduler并初始化线程数为i            task_scheduler_init init(i);            if (!run_pipeline(i)) {                return 0;            }        }        ostringstream out;        out << "global consume time: " << (tick_count::now() - global_start).seconds() << endl;    } catch(exception& e) {        cerr << "error ocurred. error text is:\"" << e.what() << "\"" << endl;    }    return 0;}

字符串分割函数:

void _split(const string& s, char delim,vector
& v) { int i = 0; size_t pos = s.find(delim); while (pos != string::npos) { v.push_back(s.substr(i, pos-i)); i = ++pos; pos = s.find(delim, pos); if (pos == string::npos) v.push_back(s.substr(i, s.length())); }}
你可能感兴趣的文章
Mac环境下svn的使用
查看>>
github简单使用教程
查看>>
如何高效利用GitHub
查看>>
环境分支-git版本管理
查看>>
uni-app 全局变量
查看>>
js判断空对象的几种方法
查看>>
java 不用递归写tree
查看>>
springboot2 集成Hibernate JPA 用 声明式事物
查看>>
fhs-framework jetcache 缓存维护之自动清除缓存
查看>>
SpringBoot 动态编译 JAVA class 解决 jar in jar 的依赖问题
查看>>
fhs-framework springboot mybatis 解决表关联查询问题的关键方案-翻译服务
查看>>
ZUUL2 使用场景
查看>>
Spring AOP + Redis + 注解实现redis 分布式锁
查看>>
elastic-job 和springboot 集成干货
查看>>
php开发微服务注册到eureka中(使用sidecar)
查看>>
mybatis mybatis plus mybatis jpa hibernate spring data jpa比较
查看>>
支付宝生活号服务号 用户信息获取 oauth2 登录对接 springboot java
查看>>
CodeForces #196(Div. 2) 337D Book of Evil (树形dp)
查看>>
uva 12260 - Free Goodies (dp,贪心 | 好题)
查看>>
uva-1427 Parade (单调队列优化dp)
查看>>