pipeline.h

00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_pipeline_H 
00022 #define __TBB_pipeline_H 
00023 
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028 
00029 namespace tbb {
00030 
00031 class pipeline;
00032 class filter;
00033 
00035 namespace internal {
00036 
00037 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00038 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00039 
00040 typedef unsigned long Token;
00041 typedef long tokendiff_t;
00042 class stage_task;
00043 class input_buffer;
00044 class pipeline_root_task;
00045 class pipeline_cleaner;
00046 
00047 } // namespace internal
00048 
00049 namespace interface5 {
00050     template<typename T, typename U> class filter_t;
00051 
00052     namespace internal {
00053         class pipeline_proxy;
00054     }
00055 }
00056 
00058 
00060 
00061 class filter: internal::no_copy {
00062 private:
00064     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00065     
00067     static const unsigned char filter_is_serial = 0x1; 
00068 
00070 
00072     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00073 
00075     static const unsigned char filter_is_bound = 0x1<<5;  
00076 
00078     static const unsigned char exact_exception_propagation =
00079 #if TBB_USE_CAPTURED_EXCEPTION
00080             0x0;
00081 #else
00082             0x1<<7;
00083 #endif /* TBB_USE_CAPTURED_EXCEPTION */
00084 
00085     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00086     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00087 public:
00088     enum mode {
00090         parallel = current_version | filter_is_out_of_order, 
00092         serial_in_order = current_version | filter_is_serial,
00094         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00096         serial = serial_in_order
00097     };
00098 protected:
00099     filter( bool is_serial_ ) : 
00100         next_filter_in_pipeline(not_in_pipeline()),
00101         my_input_buffer(NULL),
00102         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00103         prev_filter_in_pipeline(not_in_pipeline()),
00104         my_pipeline(NULL),
00105         next_segment(NULL)
00106     {}
00107     
00108     filter( mode filter_mode ) :
00109         next_filter_in_pipeline(not_in_pipeline()),
00110         my_input_buffer(NULL),
00111         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00112         prev_filter_in_pipeline(not_in_pipeline()),
00113         my_pipeline(NULL),
00114         next_segment(NULL)
00115     {}
00116 
00117 public:
00119     bool is_serial() const {
00120         return bool( my_filter_mode & filter_is_serial );
00121     }  
00122     
00124     bool is_ordered() const {
00125         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00126     }
00127 
00129     bool is_bound() const {
00130         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00131     }
00132 
00134 
00135     virtual void* operator()( void* item ) = 0;
00136 
00138 
00139     virtual __TBB_EXPORTED_METHOD ~filter();
00140 
00141 #if __TBB_TASK_GROUP_CONTEXT
00143 
00145     virtual void finalize( void* /*item*/ ) {};
00146 #endif
00147 
00148 private:
00150     filter* next_filter_in_pipeline;
00151 
00153 
00154     internal::input_buffer* my_input_buffer;
00155 
00156     friend class internal::stage_task;
00157     friend class internal::pipeline_root_task;
00158     friend class pipeline;
00159     friend class thread_bound_filter;
00160 
00162     const unsigned char my_filter_mode;
00163 
00165     filter* prev_filter_in_pipeline;
00166 
00168     pipeline* my_pipeline;
00169 
00171 
00172     filter* next_segment;
00173 };
00174 
00176 
00177 class thread_bound_filter: public filter {
00178 public:
00179     enum result_type {
00180         // item was processed
00181         success,
00182         // item is currently not available
00183         item_not_available,
00184         // there are no more items to process
00185         end_of_stream
00186     };
00187 protected:
00188     thread_bound_filter(mode filter_mode): 
00189          filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation))
00190     {}
00191 public:
00193 
00198     result_type __TBB_EXPORTED_METHOD try_process_item(); 
00199 
00201 
00205     result_type __TBB_EXPORTED_METHOD process_item();
00206 
00207 private:
00209     result_type internal_process_item(bool is_blocking);
00210 };
00211 
00213 
00214 class pipeline {
00215 public:
00217     __TBB_EXPORTED_METHOD pipeline();
00218 
00221     virtual __TBB_EXPORTED_METHOD ~pipeline();
00222 
00224     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00225 
00227     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00228 
00229 #if __TBB_TASK_GROUP_CONTEXT
00231     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00232 #endif
00233 
00235     void __TBB_EXPORTED_METHOD clear();
00236 
00237 private:
00238     friend class internal::stage_task;
00239     friend class internal::pipeline_root_task;
00240     friend class filter;
00241     friend class thread_bound_filter;
00242     friend class internal::pipeline_cleaner;
00243     friend class tbb::interface5::internal::pipeline_proxy;
00244 
00246     filter* filter_list;
00247 
00249     filter* filter_end;
00250 
00252     task* end_counter;
00253 
00255     atomic<internal::Token> input_tokens;
00256 
00258     atomic<internal::Token> token_counter;
00259 
00261     bool end_of_input;
00262 
00264     bool has_thread_bound_filters;
00265 
00267     void remove_filter( filter& filter_ );
00268 
00270     void __TBB_EXPORTED_METHOD inject_token( task& self );
00271 
00272 #if __TBB_TASK_GROUP_CONTEXT
00274     void clear_filters();
00275 #endif
00276 };
00277 
00278 //------------------------------------------------------------------------
00279 // Support for lambda-friendly parallel_pipeline interface
00280 //------------------------------------------------------------------------
00281 
00282 namespace interface5 {
00283 
00284 namespace internal {
00285     template<typename T, typename U, typename Body> class concrete_filter;
00286 }
00287 
00289 class flow_control {
00290     bool is_pipeline_stopped;
00291     flow_control() { is_pipeline_stopped = false; }
00292     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00293 public:
00294     void stop() { is_pipeline_stopped = true; }
00295 };
00296 
00298 namespace internal {
00299 
00300 template<typename T, typename U, typename Body>
00301 class concrete_filter: public tbb::filter {
00302     const Body& my_body;
00303 
00304     typedef typename tbb::tbb_allocator<U> u_allocator;
00305     typedef typename tbb::tbb_allocator<T> t_allocator;
00306 
00307     /*override*/ void* operator()(void* input) {
00308         T* temp_input = (T*)input;
00309         // Call user's operator()() here
00310         U* output_u = u_allocator().allocate(1);
00311         void* output = (void*) new (output_u) U(my_body(*temp_input)); 
00312         t_allocator().destroy(temp_input);
00313         t_allocator().deallocate(temp_input,1);
00314         return output;
00315     }
00316 
00317 public:
00318     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00319 };
00320 
00321 template<typename U, typename Body>
00322 class concrete_filter<void,U,Body>: public filter {
00323     const Body& my_body;
00324 
00325     typedef typename tbb::tbb_allocator<U> u_allocator;
00326 
00327     /*override*/void* operator()(void*) {
00328         flow_control control;
00329         U* output_u = u_allocator().allocate(1);
00330         (void) new (output_u) U(my_body(control));
00331         if(control.is_pipeline_stopped) {
00332             u_allocator().destroy(output_u);
00333             u_allocator().deallocate(output_u,1);
00334             output_u = NULL;
00335         }
00336         return (void*)output_u;
00337     }
00338 public:
00339     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00340 };
00341 
00342 template<typename T, typename Body>
00343 class concrete_filter<T,void,Body>: public filter {
00344     const Body& my_body;
00345    
00346     typedef typename tbb::tbb_allocator<T> t_allocator;
00347 
00348     /*override*/ void* operator()(void* input) {
00349         T* temp_input = (T*)input;
00350         my_body(*temp_input);
00351         t_allocator().destroy(temp_input);
00352         t_allocator().deallocate(temp_input,1);
00353         return NULL;
00354     }
00355 public:
00356     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00357 };
00358 
00359 template<typename Body>
00360 class concrete_filter<void,void,Body>: public filter {
00361     const Body& my_body;
00362     
00364     /*override*/ void* operator()(void*) {
00365         flow_control control;
00366         my_body(control);
00367         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
00368         return output;
00369     }
00370 public:
00371     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00372 };
00373 
00375 
00376 class pipeline_proxy {
00377     tbb::pipeline my_pipe;
00378 public:
00379     pipeline_proxy( const filter_t<void,void>& filter_chain );
00380     ~pipeline_proxy() {
00381         while( filter* f = my_pipe.filter_list ) 
00382             delete f; // filter destructor removes it from the pipeline
00383     }
00384     tbb::pipeline* operator->() { return &my_pipe; }
00385 };
00386 
00388 
00389 class filter_node: tbb::internal::no_copy {
00391     tbb::atomic<intptr_t> ref_count;
00392 protected:
00393     filter_node() {
00394         ref_count = 0;
00395 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00396         ++(__TBB_TEST_FILTER_NODE_COUNT);
00397 #endif
00398     }
00399 public:
00401     virtual void add_to( pipeline& ) = 0;
00403     void add_ref() {++ref_count;}
00405     void remove_ref() {
00406         __TBB_ASSERT(ref_count>0,"ref_count underflow");
00407         if( --ref_count==0 ) 
00408             delete this;
00409     }
00410     virtual ~filter_node() {
00411 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00412         --(__TBB_TEST_FILTER_NODE_COUNT);
00413 #endif
00414     }
00415 };
00416 
00418 template<typename T, typename U, typename Body>
00419 class filter_node_leaf: public filter_node  {
00420     const tbb::filter::mode mode;
00421     const Body body;
00422     /*override*/void add_to( pipeline& p ) {
00423         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00424         p.add_filter( *f );
00425     }
00426 public:
00427     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00428 };
00429 
00431 class filter_node_join: public filter_node {
00432     friend class filter_node; // to suppress GCC 3.2 warnings
00433     filter_node& left;
00434     filter_node& right;
00435     /*override*/~filter_node_join() {
00436        left.remove_ref();
00437        right.remove_ref();
00438     }
00439     /*override*/void add_to( pipeline& p ) {
00440         left.add_to(p);
00441         right.add_to(p);
00442     }
00443 public:
00444     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00445        left.add_ref();
00446        right.add_ref();
00447     }
00448 };
00449 
00450 } // namespace internal
00452 
00454 template<typename T, typename U, typename Body>
00455 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00456     return new internal::filter_node_leaf<T,U,Body>(mode, body);
00457 }
00458 
00459 template<typename T, typename V, typename U>
00460 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00461     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00462     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00463     return new internal::filter_node_join(*left.root,*right.root);
00464 }
00465 
00467 template<typename T, typename U>
00468 class filter_t {
00469     typedef internal::filter_node filter_node;
00470     filter_node* root;
00471     filter_t( filter_node* root_ ) : root(root_) {
00472         root->add_ref();
00473     }
00474     friend class internal::pipeline_proxy;
00475     template<typename T_, typename U_, typename Body>
00476     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00477     template<typename T_, typename V_, typename U_>
00478     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00479 public:
00480     filter_t() : root(NULL) {}
00481     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00482         if( root ) root->add_ref();
00483     }
00484     template<typename Body>
00485     filter_t( tbb::filter::mode mode, const Body& body ) :
00486         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00487         root->add_ref();
00488     }
00489 
00490     void operator=( const filter_t<T,U>& rhs ) {
00491         // Order of operations below carefully chosen so that reference counts remain correct
00492         // in unlikely event that remove_ref throws exception.
00493         filter_node* old = root;
00494         root = rhs.root; 
00495         if( root ) root->add_ref();
00496         if( old ) old->remove_ref();
00497     }
00498     ~filter_t() {
00499         if( root ) root->remove_ref();
00500     }
00501     void clear() {
00502         // Like operator= with filter_t() on right side.
00503         if( root ) {
00504             filter_node* old = root;
00505             root = NULL;
00506             old->remove_ref();
00507         }
00508     }
00509 };
00510 
00511 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00512     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
00513     filter_chain.root->add_to(my_pipe);
00514 }
00515 
00516 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00517 #if __TBB_TASK_GROUP_CONTEXT
00518     , tbb::task_group_context& context
00519 #endif
00520     ) {
00521     internal::pipeline_proxy pipe(filter_chain);
00522     // tbb::pipeline::run() is called via the proxy
00523     pipe->run(max_number_of_live_tokens
00524 #if __TBB_TASK_GROUP_CONTEXT
00525               , context
00526 #endif
00527     );
00528 }
00529 
00530 #if __TBB_TASK_GROUP_CONTEXT
00531 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00532     tbb::task_group_context context;
00533     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00534 }
00535 #endif // __TBB_TASK_GROUP_CONTEXT
00536 
00537 } // interface5
00538 
00539 using interface5::flow_control;
00540 using interface5::filter_t;
00541 using interface5::make_filter;
00542 using interface5::parallel_pipeline;
00543 
00544 } // tbb
00545 
00546 #endif /* __TBB_pipeline_H */

Copyright © 2005-2010 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.