TPIE

2362a60
join.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_JOIN_H
21 #define TPIE_PIPELINING_JOIN_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_helpers.h>
25 #include <tpie/pipelining/pipe_base.h>
26 
27 namespace tpie {
28 namespace pipelining {
29 
38 template <typename T>
39 class join {
40 public:
41  class source_base : public node {
42  public:
43  source_base(node_token token)
44  : node(token)
45  {
46  }
47 
48  source_base(source_base &&) = default;
49 
50  virtual void push(const T & v) = 0;
51 
52  protected:
53  ~source_base() {}
54  };
55 
56  template <typename dest_t>
57  class source_impl : public source_base {
58  public:
59  source_impl(dest_t dest, node_token token, source_base ** the_source)
60  : source_base(token)
61  , the_source(the_source)
62  , dest(std::move(dest))
63  {
64  this->set_name("Join source", PRIORITY_INSIGNIFICANT);
65  this->add_push_destination(this->dest);
66  }
67 
68  source_impl(source_impl &&) = default;
69 
70  virtual void prepare() override {
71  if (*the_source != NULL && *the_source != this) {
72  // If join.source() is used twice, the second construction of node()
73  // should fail since the node_token is already used.
74  // Thus, this exception should never be thrown.
75  throw exception("Attempted to set join source a second time");
76  }
77  *the_source = this;
78  };
79 
80  virtual void push(const T & v) override {
81  dest.push(v);
82  }
83 
84  private:
85  source_base ** the_source;
86  dest_t dest;
87  };
88 
89  pipe_begin<factory<source_impl, node_token, source_base **> > source() {
90  return factory<source_impl, node_token, source_base **>(source_token, &the_source);
91  }
92 
93  class sink_impl : public node {
94  public:
95  typedef T item_type;
96 
97  sink_impl(node_token source_token, source_base ** the_source)
98  : the_source(the_source)
99  {
100  set_name("Join sink", PRIORITY_INSIGNIFICANT);
101  add_push_destination(source_token);
102  }
103 
104  virtual void begin() override {
105  the_source_cache = *the_source;
106  }
107 
108  void push(const T & v) {
109  the_source_cache->push(v);
110  }
111 
112  private:
113  source_base * the_source_cache;
114  source_base ** the_source;
115  };
116 
117  pipe_end<termfactory<sink_impl, node_token, source_base **> > sink() {
118  return termfactory<sink_impl, node_token, source_base **>(source_token, &the_source);
119  }
120 
121  join() : the_source(NULL) {}
122 private:
123  source_base * the_source;
124  node_token source_token;
125 };
126 
127 } // namespace pipelining
128 } // namespace tpie
129 
130 #endif // TPIE_PIPELINING_JOIN_H
Base class of all nodes.
Definition: node.h:78
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void begin() override
Begin pipeline processing phase.
Definition: join.h:104
Joins multiple push streams into one.
Definition: join.h:39
virtual void prepare() override
Called before memory assignment but after depending phases have executed and ended.
Definition: join.h:70
node()
Default constructor, using a new node_token.