TPIE

2362a60
subpipeline.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2016, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 #ifndef __TPIE_PIPELINING_SUBPIPELINE_H__
20 #define __TPIE_PIPELINING_SUBPIPELINE_H__
21 
22 #include <tpie/pipelining/pipeline.h>
23 #include <tpie/pipelining/runtime.h>
24 
25 namespace tpie {
26 namespace pipelining {
27 namespace bits {
28 
30 public:
31  void begin(stream_size_type items, progress_indicator_base & pi,
32  memory_size_type filesAvailable, memory_size_type mem,
33  const char * file, const char * function);
34  void begin(stream_size_type items, progress_indicator_base & pi,
35  memory_size_type mem,
36  const char * file, const char * function) {
37  begin(items, pi, get_file_manager().available(), mem, file, function);
38  }
39  void end();
41 protected:
42  node * frontNode;
43 private:
44  gocontext_ptr gc;
45  std::unique_ptr<runtime> rt;
46 };
47 
48 template <typename item_type>
51 
52  virtual void push(const item_type &) = 0;
53 };
54 
55 template <typename item_type, typename fact_t>
56 struct subpipeline_impl: public subpipeline_virt<item_type> {
57  typename fact_t::constructed_type front;
58 
59  subpipeline_impl(fact_t fact): front(fact.construct()) {
60  this->m_nodeMap = front.get_node_map();
61  this->frontNode = &front;
62  }
63 
64  subpipeline_impl(const subpipeline_impl &) = delete;
65  subpipeline_impl & operator=(const subpipeline_impl &) = delete;
67  subpipeline_impl & operator=(subpipeline_impl &&) = delete;
68 
69  void push(const item_type & item) override {
70  front.push(item);
71  };
72 
73 };
74 } //namespace bits
75 
81 template <typename item_type>
82 struct subpipeline {
83  subpipeline() {}
84  subpipeline(subpipeline &&) = default;
85  subpipeline(const subpipeline &) = default;
86  subpipeline & operator=(subpipeline &&) = default;
87  subpipeline & operator=(const subpipeline &) = default;
88  subpipeline(const std::shared_ptr<bits::subpipeline_virt<item_type>> & p): p(p) {}
89 
90  template <typename T>
91  subpipeline(T from) {
92  *this = std::move(from);
93  }
94 
95  template <typename T>
96  subpipeline & operator=(T from) {
97  p.reset(new bits::subpipeline_impl<item_type, T>(std::move(from)));
98  return *this;
99  }
100 
101  void clear() {p.reset();}
102 
103  void push(const item_type & item) {p->push(item);}
104 
105  void begin(size_t filesAvailable, size_t memory) {
106  p->begin(1, p->pi, filesAvailable, memory, nullptr, nullptr);
107  }
108 
109  void begin(size_t memory) {
110  begin(get_file_manager().available(), memory);
111  }
112 
113  void begin(stream_size_type items, progress_indicator_base & pi,
114  memory_size_type filesAvailable, memory_size_type mem,
115  const char * file, const char * function) {
116  p->begin(items, pi, filesAvailable, mem, file, function);
117  }
118 
119  void begin(stream_size_type items, progress_indicator_base & pi,
120  memory_size_type mem,
121  const char * file, const char * function) {
122  begin(items, pi, get_file_manager().available(), mem, file, function);
123  }
124 
125  void end() {p->end();}
126 
127  void plot(std::ostream & os = std::cout) {
128  p->plot(os);
129  }
130 
131  void plot_full(std::ostream & os = std::cout) {
132  p->plot_full(os);
133  }
134 
135  bits::node_map::ptr get_node_map() const {
136  return p->get_node_map();
137  }
138 
139  bool can_fetch(std::string key) {
140  return p->can_fetch(key);
141  }
142 
143  any_noncopyable & fetch_any(std::string key) {
144  return p->fetch_any(key);
145  }
146 
147  template <typename T>
148  T & fetch(std::string key) {
149  any_noncopyable &a = fetch_any(key);
150  return any_cast<T>(a);
151  }
152 
153  void forward_any(std::string key, any_noncopyable value) {
154  return p->forward_any(key, std::move(value));
155  }
156 
157  template <typename T>
158  void forward(std::string key, T value) {
159  forward_any(key, any_noncopyable(value));
160  }
161 
162  void output_memory(std::ostream & o) const {p->output_memory(o);}
163 private:
164  std::shared_ptr<bits::subpipeline_virt<item_type>> p;
165 };
166 
167 } //namespace pipelining
168 } //namespace tpie
169 
170 #endif //__TPIE_PIPELINING_SUBPIPELINE_H__
The base class for indicating the progress of some task.
Central file abstraction.
Definition: file.h:44
Base class of all nodes.
Definition: node.h:78
a dummy progress indicator that produces no output
file_manager & get_file_manager()
Return a reference to the file manager.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654