20 #ifndef TPIE_PIPELINING_JOIN_H
21 #define TPIE_PIPELINING_JOIN_H
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
27 namespace pipelining {
47 virtual void push(
const T & v) = 0;
53 template <
typename dest_t>
58 , the_source(the_source)
61 this->
set_name(
"Join source", PRIORITY_INSIGNIFICANT);
66 if (the_source != NULL && the_source !=
this) {
70 throw exception(
"Attempted to set join source a second time");
75 virtual void push(
const T & v)
override {
80 source_base * & the_source;
84 pipe_begin<factory_2<source_impl, node_token, source_base * &> > source() {
85 return factory_2<source_impl, node_token, source_base * &>(source_token, the_source);
93 : the_source(the_source)
95 set_name(
"Join sink", PRIORITY_INSIGNIFICANT);
100 the_source_cache = the_source;
103 void push(
const T & v) {
104 the_source_cache->push(v);
108 source_base * the_source_cache;
109 source_base * & the_source;
112 pipe_end<termfactory_2<sink_impl, node_token, source_base * &> > sink() {
113 return termfactory_2<sink_impl, node_token, source_base * &>(source_token, the_source);
116 join() : the_source(NULL) {}
118 source_base * the_source;
119 node_token source_token;
125 #endif // TPIE_PIPELINING_JOIN_H
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void begin() override
Begin pipeline processing phase.
Joins multiple push streams into one.
virtual void prepare() override
Called before memory assignment but after depending phases have executed and ended.
node()
Default constructor, using a new node_token.