20 #ifndef __TPIE_PIPELINING_HELPERS_H__
21 #define __TPIE_PIPELINING_HELPERS_H__
24 #include <tpie/pipelining/node.h>
25 #include <tpie/pipelining/factory_helpers.h>
30 namespace pipelining {
34 template <
typename dest_t>
37 typedef typename dest_t::item_type item_type;
39 inline ostream_logger_t(
const dest_t & dest, std::ostream & log) : dest(dest), log(log), begun(
false), ended(
false) {
41 set_name(
"Log", PRIORITY_INSIGNIFICANT);
47 virtual void end()
override {
51 inline void push(
const item_type & item) {
53 log <<
"WARNING: push() called before begin(). Calling begin on rest of pipeline." << std::endl;
57 log <<
"WARNING: push() called after end()." << std::endl;
60 log <<
"pushing " << item << std::endl;
70 template <
typename dest_t>
73 typedef typename dest_t::item_type item_type;
75 inline identity_t(
const dest_t & dest) : dest(dest) {
77 set_name(
"Identity", PRIORITY_INSIGNIFICANT);
80 inline void push(
const item_type & item) {
87 template <
typename source_t>
90 typedef typename source_t::item_type item_type;
94 set_name(
"Identity", PRIORITY_INSIGNIFICANT);
97 inline item_type pull() {
101 inline bool can_pull() {
102 return source.can_pull();
109 template <
typename T>
115 boost::shared_ptr<T> buffer;
116 inline void push(
const T & el) {
124 template <
typename pushfact_t>
128 template <
typename source_t>
132 typedef typename source_t::item_type item_type;
133 typedef typename pushfact_t::template constructed<dummydest_t<item_type> >::type pusher_t;
139 inline puller_t(
const source_t & source,
const pushfact_t & pushfact)
141 , pusher(pushfact.construct(dummydest))
147 inline item_type pull() {
148 pusher.push(source.pull());
149 return dummydest.pull();
152 inline bool can_pull() {
153 return source.can_pull();
159 template <
typename pullfact_t>
163 template <
typename dest_t>
166 typedef typename dest_t::item_type item_type;
167 typedef typename pullfact_t::template constructed<dummydest_t<item_type> >::type puller_t;
173 inline pusher_t(
const dest_t & dest,
const pullfact_t & pullfact)
175 , puller(pullfact.construct(dummydest))
181 inline void push(
const item_type & item) {
182 dummydest.push(item);
183 dest.push(puller.pull());
189 template <
typename T>
194 inline void push(
const T &) {
198 template <
typename fact2_t>
201 typedef typename fact2_t::constructed_type dest2_t;
203 template <
typename dest_t>
206 typedef typename dest_t::item_type item_type;
208 inline type(
const dest_t & dest,
const fact2_t & fact2) : dest(dest), dest2(fact2.construct()) {
211 set_name(
"Fork", PRIORITY_INSIGNIFICANT);
214 inline void push(
const item_type & item) {
225 template <
typename T>
229 void push(
const T &) {}
232 template <
typename IT>
237 typedef typename IT::value_type item_type;
242 set_name(
"Input iterator", PRIORITY_INSIGNIFICANT);
254 template <
typename IT>
257 template <
typename dest_t>
263 type(dest_t dest, IT from, IT to)
268 set_name(
"Input iterator", PRIORITY_INSIGNIFICANT);
272 virtual void go()
override {
281 template <
typename Iterator,
typename Item =
void>
284 template <
typename Iterator>
288 typedef typename Iterator::value_type item_type;
292 set_name(
"Output iterator", PRIORITY_INSIGNIFICANT);
295 void push(
const item_type & item) {
301 template <
typename Iterator,
typename Item>
305 typedef Item item_type;
309 set_name(
"Output iterator", PRIORITY_INSIGNIFICANT);
312 void push(
const item_type & item) {
318 template <
typename IT>
321 template <
typename dest_t>
326 type(dest_t dest, IT to)
330 set_name(
"Output iterator", PRIORITY_INSIGNIFICANT);
334 virtual void go()
override {
335 while (dest.can_pull()) {
345 inline pipe_middle<factory_1<bits::ostream_logger_t, std::ostream &> >
347 return factory_1<bits::ostream_logger_t, std::ostream &>(std::cout);
350 inline pipe_middle<factory_0<bits::identity_t> > identity() {
351 return pipe_middle<factory_0<bits::identity_t> >();
354 inline pullpipe_middle<factory_1<bits::push_to_pull<factory_0<bits::identity_t> >::puller_t, factory_0<bits::identity_t> > > pull_identity() {
355 return factory_1<bits::push_to_pull<factory_0<bits::identity_t> >::puller_t, factory_0<bits::identity_t> >(factory_0<bits::identity_t>());
359 pipe_middle<factory_1<
360 bits::pull_to_push<factory_0<bits::pull_identity_t> >::pusher_t,
361 factory_0<bits::pull_identity_t>
365 bits::pull_to_push<factory_0<bits::pull_identity_t> >::pusher_t,
366 factory_0<bits::pull_identity_t>
367 >(factory_0<bits::pull_identity_t>());
370 template <
typename T>
371 inline pipe_end<termfactory_0<bits::bitbucket_t<T> > >
373 return termfactory_0<bits::bitbucket_t<T> >();
376 template <
typename fact_t>
377 inline pipe_middle<tempfactory_1<bits::fork_t<fact_t>,
const fact_t &> >
378 fork(
const pipe_end<fact_t> & to) {
379 return tempfactory_1<bits::fork_t<fact_t>,
const fact_t &>(to.factory);
382 template <
typename T>
383 inline pipe_end<termfactory_0<bits::null_sink_t<T> > >
384 null_sink() {
return termfactory_0<bits::null_sink_t<T> >();}
386 template <
template <
typename dest_t>
class Fact>
387 pipe_begin<factory_0<Fact> > make_pipe_begin_0() {
388 return pipe_begin<factory_0<Fact> >(factory_0<Fact>());
391 template <
template <
typename dest_t>
class Fact>
392 pipe_middle<factory_0<Fact> > make_pipe_middle_0() {
393 return pipe_middle<factory_0<Fact> >(factory_0<Fact>());
396 template <
typename Fact>
397 pipe_end<termfactory_0<Fact> > make_pipe_end_0() {
398 return pipe_end<termfactory_0<Fact> >(termfactory_0<Fact>());
401 template <
template <
typename dest_t>
class Fact,
typename T1>
402 pipe_begin<factory_1<Fact, T1> > make_pipe_begin_1(T1 e1) {
403 return pipe_begin<factory_1<Fact, T1> >(factory_1<Fact, T1>(e1));
406 template <
template <
typename dest_t>
class Fact,
typename T1>
407 pipe_middle<factory_1<Fact, T1> > make_pipe_middle_1(T1 e1) {
408 return pipe_middle<factory_1<Fact, T1> >(factory_1<Fact, T1>(e1));
411 template <
typename Fact,
typename T1>
412 pipe_end<termfactory_1<Fact, T1> > make_pipe_end_1(T1 e1) {
413 return pipe_end<termfactory_1<Fact, T1> >(termfactory_1<Fact, T1>(e1));
416 template <
template <
typename dest_t>
class Fact,
typename T1,
typename T2>
417 pipe_begin<factory_2<Fact, T1, T2> > make_pipe_begin_2(T1 e1, T2 e2) {
418 return pipe_begin<factory_2<Fact, T1, T2> >(factory_2<Fact, T1, T2>(e1, e2));
421 template <
template <
typename dest_t>
class Fact,
typename T1,
typename T2>
422 pipe_middle<factory_2<Fact, T1, T2> > make_pipe_middle_2(T1 e1, T2 e2) {
423 return pipe_middle<factory_2<Fact, T1, T2> >(factory_2<Fact, T1, T2>(e1, e2));
426 template <
typename Fact,
typename T1,
typename T2>
427 pipe_end<termfactory_2<Fact, T1, T2> > make_pipe_end_2(T1 e1, T2 e2) {
428 return pipe_end<termfactory_2<Fact, T1, T2> >(termfactory_2<Fact, T1, T2>(e1, e2));
431 template <
typename IT>
432 pullpipe_begin<termfactory_2<bits::pull_input_iterator_t<IT>, IT, IT> > pull_input_iterator(IT begin, IT end) {
433 return termfactory_2<bits::pull_input_iterator_t<IT>, IT, IT>(begin, end);
436 template <
typename IT>
437 pipe_begin<tempfactory_2<bits::push_input_iterator_t<IT>, IT, IT> > push_input_iterator(IT begin, IT end) {
438 return tempfactory_2<bits::push_input_iterator_t<IT>, IT, IT>(begin, end);
441 template <
typename IT>
442 pipe_end<termfactory_1<bits::push_output_iterator_t<IT>, IT> > push_output_iterator(IT to) {
443 return termfactory_1<bits::push_output_iterator_t<IT>, IT>(to);
446 template <
typename Item,
typename IT>
447 pipe_end<termfactory_1<bits::push_output_iterator_t<IT, Item>, IT> > typed_push_output_iterator(IT to) {
448 return termfactory_1<bits::push_output_iterator_t<IT, Item>, IT>(to);
451 template <
typename IT>
452 pullpipe_end<tempfactory_1<bits::pull_output_iterator_t<IT>, IT> > pull_output_iterator(IT to) {
453 return tempfactory_1<bits::pull_output_iterator_t<IT>, IT>(to);
virtual void begin()
Begin pipeline processing phase.
Memory management subsystem.
void add_pull_source(const node_token &dest)
Called by implementers to declare a pull source.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
virtual void end() override
End pipeline processing phase.
virtual void begin() override
Begin pipeline processing phase.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void end()
End pipeline processing phase.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.