TPIE

2362a60
file_stream.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_FILE_STREAM_H__
21 #define __TPIE_PIPELINING_FILE_STREAM_H__
22 
23 #include <tpie/file_stream.h>
24 
25 #include <tpie/pipelining/node.h>
26 #include <tpie/pipelining/factory_helpers.h>
27 #include <tpie/pipelining/pipe_base.h>
28 #include <tpie/maybe.h>
29 #include <tpie/flags.h>
30 
31 namespace tpie {
32 namespace pipelining {
33 
34 enum stream_option {
35  STREAM_RESET=1,
36  STREAM_CLOSE=2
37 };
38 
39 TPIE_DECLARE_OPERATORS_FOR_FLAGS(stream_option)
40 typedef tpie::flags<stream_option> stream_options;
41 
42 namespace bits {
43 
49 template <typename dest_t>
50 class input_t : public node {
51 public:
52  typedef typename push_type<dest_t>::type item_type;
53 
54  input_t(dest_t dest, file_stream<item_type> & fs, stream_options options) : options(options), fs(fs), dest(std::move(dest)) {
55  add_push_destination(this->dest);
56  set_name("Read", PRIORITY_INSIGNIFICANT);
57  set_minimum_memory(fs.memory_usage());
58  }
59 
60  virtual void propagate() override {
61  if (options & STREAM_RESET) fs.seek(0);
62 
63  if (fs.is_open()) {
64  forward("items", fs.size() - fs.offset());
65  set_steps(fs.size() - fs.offset());
66  } else {
67  forward("items", 0);
68  }
69  }
70 
71  virtual void go() override {
72  if (fs.is_open()) {
73  while (fs.can_read()) {
74  dest.push(fs.read());
75  step();
76  }
77  }
78  }
79 
80  virtual void end() override {
81  if (options & STREAM_CLOSE) fs.close();
82  }
83 
84 private:
85  stream_options options;
87  dest_t dest;
88 };
89 
95 template <typename dest_t>
96 class named_input_t : public node {
97 public:
98  typedef typename push_type<dest_t>::type item_type;
99 
100  named_input_t(dest_t dest, std::string path) : dest(std::move(dest)), path(path) {
101  add_push_destination(this->dest);
102  set_name("Read", PRIORITY_INSIGNIFICANT);
103  set_minimum_memory(file_stream<item_type>::memory_usage());
104  }
105 
106  virtual void propagate() override {
107  fs.construct();
108  fs->open(path, access_read);
109  forward("items", fs->size());
110  set_steps(fs->size());
111  }
112 
113  virtual void go() override {
114  while (fs->can_read()) {
115  dest.push(fs->read());
116  step();
117  }
118  fs.destruct();
119  }
120 private:
121  dest_t dest;
123  std::string path;
124 };
125 
126 
132 template <typename T>
133 class pull_input_t : public node {
134 public:
135  typedef T item_type;
136 
137  pull_input_t(file_stream<T> & fs, stream_options options) : options(options), fs(fs) {
138  set_name("Read", PRIORITY_INSIGNIFICANT);
139  set_minimum_memory(fs.memory_usage());
140  }
141 
142  virtual void propagate() override {
143  if (options & STREAM_RESET) fs.seek(0);
144  forward("items", fs.size()-fs.offset());
145  set_steps(fs.size()-fs.offset());
146  }
147 
148  T pull() {
149  step();
150  return fs.read();
151  }
152 
153  bool can_pull() {
154  return fs.can_read();
155  }
156 
157  virtual void end() override {
158  if (options & STREAM_CLOSE) fs.close();
159  }
160 
161 private:
162  stream_options options;
163  file_stream<T> & fs;
164 };
165 
171 template <typename T>
172 class pull_reverse_input_t : public node {
173 public:
174  typedef T item_type;
175 
176  pull_reverse_input_t(file_stream<T> & fs, stream_options options) : options(options), fs(fs) {
177  set_name("Read", PRIORITY_INSIGNIFICANT);
178  set_minimum_memory(fs.memory_usage());
179  }
180 
181  virtual void propagate() override {
182  if (options & STREAM_RESET) fs.seek(0, file_stream<T>::end);
183  forward("items", fs.offset());
184  set_steps(fs.offset());
185  }
186 
187  inline T pull() {
188  step();
189  return fs.read_back();
190  }
191 
192  inline bool can_pull() {
193  return fs.can_read_back();
194  }
195 
196  virtual void end() override {
197  if (options & STREAM_CLOSE) fs.close();
198  }
199 
200 private:
201  stream_options options;
202  file_stream<T> & fs;
203 };
204 
210 template <typename T>
211 class named_pull_input_t : public node {
212 public:
213  typedef T item_type;
214 
215  named_pull_input_t(std::string path): path(std::move(path)) {
216  set_name("Read", PRIORITY_INSIGNIFICANT);
217  set_minimum_memory(file_stream<T>::memory_usage());
218  }
219 
220  virtual void propagate() override {
221  fs.construct();
222  fs->open(path, access_read);
223  forward("items", fs->size());
224  set_steps(fs->size());
225  }
226 
227  T pull() {
228  step();
229  return fs->read();
230  }
231 
232  bool can_pull() {
233  return fs->can_read();
234  }
235 
236  void end() override {
237  fs->close();
238  fs.destruct();
239  }
240 private:
242  std::string path;
243 };
244 
245 
251 template <typename T>
252 class output_t : public node {
253 public:
254  typedef T item_type;
255 
256  output_t(file_stream<T> & fs) : fs(fs) {
257  set_name("Write", PRIORITY_INSIGNIFICANT);
258  set_minimum_memory(fs.memory_usage());
259  }
260 
261  void push(const T & item) {
262  fs.write(item);
263  }
264 private:
265  file_stream<T> & fs;
266 };
267 
273 template <typename T>
274 class named_output_t : public node {
275 public:
276  typedef T item_type;
277 
278  named_output_t(const std::string & path): path(path) {
279  set_name("Write", PRIORITY_INSIGNIFICANT);
280  set_minimum_memory(file_stream<T>::memory_usage());
281  }
282 
283  void begin() override {
284  fs.construct();
285  fs->open(path, access_write);
286  }
287 
288  void push(const T & item) {
289  fs->write(item);
290  }
291 
292  void end() override {
293  fs->close();
294  fs.destruct();
295  }
296 private:
298  std::string path;
299 };
300 
301 
307 template <typename source_t>
308 class pull_output_t : public node {
309 public:
310  typedef typename pull_type<source_t>::type item_type;
311 
312  pull_output_t(source_t source, file_stream<item_type> & fs) : source(std::move(source)), fs(fs) {
313  add_pull_source(this->source);
314  set_name("Write", PRIORITY_INSIGNIFICANT);
315  set_minimum_memory(fs.memory_usage());
316  }
317 
318  virtual void go() override {
319  source.begin();
320  while (source.can_pull()) {
321  fs.write(source.pull());
322  }
323  source.end();
324  }
325 
326 private:
327  source_t source;
329 };
330 
331 template <typename dest_t, typename T>
332 class tee_t: public node {
333 public:
334  typedef T item_type;
335  tee_t(dest_t dest, file_stream<item_type> & fs): fs(fs), dest(std::move(dest)) {
336  set_minimum_memory(fs.memory_usage());
337  }
338 
339  void push(const item_type & i) {
340  fs.write(i);
341  dest.push(i);
342  }
343 private:
345  dest_t dest;
346 };
347 
348 template <typename source_t, typename T>
349 class pull_tee_t {
350 public:
351  typedef T item_type;
352  pull_tee_t(source_t source, file_stream<item_type> & fs): fs(fs), source(std::move(source)) {
353  set_minimum_memory(fs.memory_usage());
354  }
355 
356  bool can_pull() {
357  return source.can_pull();
358  }
359 
360  item_type pull() {
361  item_type i = source.pull();
362  fs.write(i);
363  return i;
364  }
365 private:
367  source_t source;
368 };
369 
370 } // namespace bits
371 
378 template<typename T>
380  stream_options options=stream_options()) {
381  return {fs, options};
382 }
383 
390 
396 template<typename T>
398  file_stream<T> & fs,
399  stream_options options=stream_options()) {
400  return {fs, options};
401 }
402 
409 template<typename T>
411  file_stream<T> & fs,
412  stream_options options=stream_options()) {
413  return {fs, options};
414 }
415 
420 template<typename T>
422  return {std::move(name)};
423 }
424 
425 
430 template <typename T>
432  return {fs};
433 }
434 
439 template <typename T>
440 inline pipe_end<termfactory<bits::named_output_t<T>, std::string> > named_output(std::string path) {
441  return {std::move(path)};
442 }
443 
444 
449 template<typename T>
451  return {fs};
452 }
453 
459 template <typename T>
461  return {fs};
462 }
463 
469 template <typename T>
471  return {fs};
472 }
473 
474 } // namespace pipelining
475 
476 } // namespace tpie
477 #endif
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
Definition: file_stream.h:379
pipe_middle< tfactory< bits::tee_t, Args< typename T::item_type >, T & > > tee(T &fs)
A pipelining node that writes the pushed to a file stream and then pushes the items to the next node...
Definition: file_stream.h:460
virtual void end() override
End pipeline processing phase.
Definition: file_stream.h:196
void end() override
End pipeline processing phase.
Definition: file_stream.h:292
bool can_read()
Check if the next call to read() will succeed or not.
Definition: stream.h:1010
virtual void end() override
End pipeline processing phase.
Definition: file_stream.h:80
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:142
file_stream pull input generator.
Definition: file_stream.h:133
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:947
pullpipe_middle< tfactory< bits::pull_tee_t, Args< typename T::item_type >, T & > > pull_tee(T &fs)
A pull-pipe node that when pulled from will pull from its source, write its item to disk and then ret...
Definition: file_stream.h:470
Open a file for reading.
Definition: access_type.h:31
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:113
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:71
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
Definition: stream.h:627
Base class of all nodes.
Definition: node.h:78
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:152
bool can_read_back()
Check if the next call to read_back() will succeed or not.
Definition: stream.h:1023
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:181
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:106
file_stream output terminator.
Definition: file_stream.h:274
void begin() override
Begin pipeline processing phase.
Definition: file_stream.h:283
Open a file for writing only, content is truncated.
Definition: access_type.h:33
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
pullpipe_begin< termfactory< bits::named_pull_input_t< T >, std::string > > named_pull_input(std::string name)
A pipelining pull-node that reads items from the given file_stream.
Definition: file_stream.h:421
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:220
void end() override
End pipeline processing phase.
Definition: file_stream.h:236
pullpipe_begin< termfactory< bits::pull_reverse_input_t< T >, file_stream< T > &, stream_options > > pull_reverse_input(file_stream< T > &fs, stream_options options=stream_options())
A pipelining pull-node that reads items in reverse order from the given file_stream.
Definition: file_stream.h:410
file_stream output terminator.
Definition: file_stream.h:252
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:431
pipe_begin< factory< bits::named_input_t, std::string > > named_input
Pipelining nodes that pushes the contents of the named file stream to the next node in the pipeline...
Definition: file_stream.h:389
pullpipe_begin< termfactory< bits::pull_input_t< T >, file_stream< T > &, stream_options > > pull_input(file_stream< T > &fs, stream_options options=stream_options())
A pipelining pull-node that reads items from the given file_stream.
Definition: file_stream.h:397
file_stream input generator.
Definition: file_stream.h:50
file_stream output pull data source.
Definition: file_stream.h:308
pullpipe_end< factory< bits::pull_output_t, file_stream< T > & > > pull_output(file_stream< T > &fs)
A pull-pipe node that writes the pulled items to a file stream.
Definition: file_stream.h:450
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: file_stream.h:318
virtual void end() override
End pipeline processing phase.
Definition: file_stream.h:157
pipe_end< termfactory< bits::named_output_t< T >, std::string > > named_output(std::string path)
A pipelining node that writes the pushed items to a named file stream.
Definition: file_stream.h:440
file_stream pull input generator.
Definition: file_stream.h:172
virtual void propagate() override
Propagate stream metadata.
Definition: file_stream.h:60