TPIE

2362a60
buffer.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef __TPIE_PIPELINING_BUFFER_H__
25 #define __TPIE_PIPELINING_BUFFER_H__
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pipe_base.h>
30 #include <tpie/file_stream.h>
31 #include <tpie/maybe.h>
32 #include <memory>
33 
34 namespace tpie {
35 
36 namespace pipelining {
37 
38 namespace bits {
39 
40 template <typename T>
41 class buffer_pull_output_t: public node {
42  tpie::maybe<file_stream<T> > * m_queue_ptr;
43  file_stream<T> * m_queue;
44 public:
45  typedef T item_type;
46 
47  buffer_pull_output_t(const node_token & input_token) {
48  add_dependency(input_token);
49  set_name("Fetching items", PRIORITY_SIGNIFICANT);
52  set_plot_options(PLOT_BUFFERED);
53  }
54 
55  virtual void propagate() override {
56  m_queue_ptr = fetch<tpie::maybe<file_stream<T> > *>("queue");
57  m_queue = &**m_queue_ptr;
58  m_queue->seek(0);
59  forward("items", m_queue->size());
60  set_steps(m_queue->size());
61  }
62 
63  bool can_pull() const {
64  return m_queue->can_read();
65  }
66 
67  T pull() {
68  step();
69  return m_queue->read();
70  }
71 
72  virtual void end() override {
73  (*m_queue_ptr).destruct();
74  }
75 };
76 
80 
81 template <typename T>
82 class buffer_input_t: public node {
83 public:
84  typedef T item_type;
85 
86  buffer_input_t(const node_token & token, std::shared_ptr<node> output=std::shared_ptr<node>())
87  : node(token)
88  , m_output(output)
89  {
90  set_name("Storing items", PRIORITY_INSIGNIFICANT);
93  set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
94  }
95 
96  void begin() override {
97  m_queue.construct();
98  m_queue->open(static_cast<memory_size_type>(0), access_sequential, compression_normal);
99  }
100 
101  void push(const T & item) {
102  m_queue->write(item);
103  }
104 
105  void end() override {
106  forward("queue", &m_queue, 1);
107  }
108 
109 private:
110  tpie::maybe< file_stream<T> > m_queue;
111  std::shared_ptr<node> m_output;
112 };
113 
117 template <typename dest_t>
118 class buffer_output_t: public node {
119 public:
120  typedef typename push_type<dest_t>::type item_type;
121 
122  buffer_output_t(dest_t dest, const node_token & input_token)
123  : dest(std::move(dest))
124  {
125  add_dependency(input_token);
126  add_push_destination(this->dest);
128  set_minimum_resource_usage(FILES, 1);
129  set_name("Buffer", PRIORITY_INSIGNIFICANT);
130  set_plot_options(PLOT_BUFFERED);
131  }
132 
133 
134  void propagate() override {
135  m_queue_ptr = fetch<tpie::maybe<file_stream<item_type> > *>("queue");
136  m_queue = &**m_queue_ptr;
137  forward("items", m_queue->size());
138  set_steps(m_queue->size());
139  }
140 
141  void go() override {
142  m_queue->seek(0);
143  while (m_queue->can_read()) {
144  dest.push(m_queue->read());
145  step();
146  }
147  }
148 
149  void end() override {
150  m_queue_ptr->destruct();
151  }
152 private:
153  tpie::maybe<file_stream<item_type> > * m_queue_ptr;
154  file_stream<item_type> * m_queue;
155  dest_t dest;
156 };
157 
158 } // namespace bits
159 
164 template <typename T>
166 public:
167  typedef T item_type;
170 private:
173 public:
176 
177 
178  passive_buffer() {}
179 
180  inline input_t raw_input() {
181  return input_t(input_token);
182  }
183 
184  inline output_t raw_output() {
185  return output_t(input_token);
186  }
187 
188  inline inputpipe_t input() {
189  return inputfact_t(input_token);
190  }
191 
192  inline outputpipe_t output() {
193  return outputfact_t(input_token);
194  }
195 
196 private:
197  node_token input_token;
198 
200  passive_buffer & operator=(const passive_buffer &);
201 };
202 
208 
209 } // namespace pipelining
210 
211 } // namespace tpie
212 
213 #endif // __TPIE_PIPELINING_BUFFER_H__
Sequential access is intended.
Definition: cache_hint.h:36
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: buffer.h:141
void propagate() override
Propagate stream metadata.
Definition: buffer.h:134
void end() override
End pipeline processing phase.
Definition: buffer.h:149
bool can_read()
Check if the next call to read() will succeed or not.
Definition: stream.h:1010
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:947
void begin() override
Begin pipeline processing phase.
Definition: buffer.h:96
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
Definition: stream.h:627
Base class of all nodes.
Definition: node.h:78
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:152
virtual void propagate() override
Propagate stream metadata.
Definition: buffer.h:55
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:564
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:207
Node factory for variadic argument terminators.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
Compressed stream.
Definition: predeclare.h:46
Compress some blocks according to available resources (time, memory).
Definition: scheme.h:40
Output node for buffer.
Definition: buffer.h:118
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:431
Plain old file_stream buffer.
Definition: buffer.h:165
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
Input node for buffer.
Definition: buffer.h:82
void end() override
End pipeline processing phase.
Definition: buffer.h:105
node()
Default constructor, using a new node_token.
virtual void end() override
End pipeline processing phase.
Definition: buffer.h:72
pipe_middle< split_factory< bits::buffer_input_t, node, bits::buffer_output_t > > buffer
The buffer node inserts a phase boundary into the pipeline by writing items to disk.
Definition: buffer.h:207