20 #ifndef __TPIE_PIPELINING_FILE_STREAM_H__
21 #define __TPIE_PIPELINING_FILE_STREAM_H__
23 #include <tpie/file_stream.h>
25 #include <tpie/pipelining/node.h>
26 #include <tpie/pipelining/factory_helpers.h>
27 #include <tpie/pipelining/pipe_base.h>
29 #include <tpie/flags.h>
32 namespace pipelining {
39 TPIE_DECLARE_OPERATORS_FOR_FLAGS(stream_option)
40 typedef tpie::flags<stream_option> stream_options;
49 template <
typename dest_t>
55 add_push_destination(this->dest);
56 set_name(
"Read", PRIORITY_INSIGNIFICANT);
57 set_minimum_memory(fs.memory_usage());
61 if (options & STREAM_RESET) fs.
seek(0);
64 forward(
"items", fs.size() - fs.offset());
65 set_steps(fs.size() - fs.offset());
71 virtual void go()
override {
80 virtual void end()
override {
81 if (options & STREAM_CLOSE) fs.close();
85 stream_options options;
95 template <
typename dest_t>
100 named_input_t(dest_t dest, std::string path) : dest(std::move(dest)), path(path) {
101 add_push_destination(this->dest);
102 set_name(
"Read", PRIORITY_INSIGNIFICANT);
109 forward(
"items", fs->size());
110 set_steps(fs->size());
113 virtual void go()
override {
114 while (fs->can_read()) {
115 dest.push(fs->read());
132 template <
typename T>
138 set_name(
"Read", PRIORITY_INSIGNIFICANT);
139 set_minimum_memory(fs.memory_usage());
143 if (options & STREAM_RESET) fs.
seek(0);
144 forward(
"items", fs.size()-fs.offset());
145 set_steps(fs.size()-fs.offset());
157 virtual void end()
override {
158 if (options & STREAM_CLOSE) fs.close();
162 stream_options options;
171 template <
typename T>
177 set_name(
"Read", PRIORITY_INSIGNIFICANT);
178 set_minimum_memory(fs.memory_usage());
183 forward(
"items", fs.offset());
184 set_steps(fs.offset());
189 return fs.read_back();
192 inline bool can_pull() {
196 virtual void end()
override {
197 if (options & STREAM_CLOSE) fs.close();
201 stream_options options;
210 template <
typename T>
216 set_name(
"Read", PRIORITY_INSIGNIFICANT);
223 forward(
"items", fs->size());
224 set_steps(fs->size());
251 template <
typename T>
257 set_name(
"Write", PRIORITY_INSIGNIFICANT);
258 set_minimum_memory(fs.memory_usage());
261 void push(
const T & item) {
273 template <
typename T>
279 set_name(
"Write", PRIORITY_INSIGNIFICANT);
288 void push(
const T & item) {
307 template <
typename source_t>
313 add_pull_source(this->source);
314 set_name(
"Write", PRIORITY_INSIGNIFICANT);
315 set_minimum_memory(fs.memory_usage());
318 virtual void go()
override {
320 while (source.can_pull()) {
321 fs.write(source.pull());
331 template <
typename dest_t,
typename T>
336 set_minimum_memory(fs.memory_usage());
339 void push(
const item_type & i) {
348 template <
typename source_t,
typename T>
353 set_minimum_memory(fs.memory_usage());
357 return source.can_pull();
361 item_type i = source.pull();
380 stream_options options=stream_options()) {
381 return {fs, options};
399 stream_options options=stream_options()) {
400 return {fs, options};
412 stream_options options=stream_options()) {
413 return {fs, options};
422 return {std::move(name)};
430 template <
typename T>
439 template <
typename T>
441 return {std::move(path)};
459 template <
typename T>
469 template <
typename T>
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
pipe_middle< tfactory< bits::tee_t, Args< typename T::item_type >, T & > > tee(T &fs)
A pipelining node that writes the pushed to a file stream and then pushes the items to the next node...
void end() override
End pipeline processing phase.
bool can_read()
Check if the next call to read() will succeed or not.
const T & read()
Reads next item from stream if can_read() == true.
pullpipe_middle< tfactory< bits::pull_tee_t, Args< typename T::item_type >, T & > > pull_tee(T &fs)
A pull-pipe node that when pulled from will pull from its source, write its item to disk and then ret...
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
Class to deduce the item_type of a node of type T.
bool can_read_back()
Check if the next call to read_back() will succeed or not.
file_stream output terminator.
void begin() override
Begin pipeline processing phase.
Open a file for writing only, content is truncated.
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
pullpipe_begin< termfactory< bits::named_pull_input_t< T >, std::string > > named_pull_input(std::string name)
A pipelining pull-node that reads items from the given file_stream.
pullpipe_begin< termfactory< bits::pull_reverse_input_t< T >, file_stream< T > &, stream_options > > pull_reverse_input(file_stream< T > &fs, stream_options options=stream_options())
A pipelining pull-node that reads items in reverse order from the given file_stream.
file_stream output terminator.
A pipe_middle class pushes input down the pipeline.
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
pipe_begin< factory< bits::named_input_t, std::string > > named_input
Pipelining nodes that pushes the contents of the named file stream to the next node in the pipeline...
pullpipe_begin< termfactory< bits::pull_input_t< T >, file_stream< T > &, stream_options > > pull_input(file_stream< T > &fs, stream_options options=stream_options())
A pipelining pull-node that reads items from the given file_stream.
file_stream output pull data source.
pullpipe_end< factory< bits::pull_output_t, file_stream< T > & > > pull_output(file_stream< T > &fs)
A pull-pipe node that writes the pulled items to a file stream.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
pipe_end< termfactory< bits::named_output_t< T >, std::string > > named_output(std::string path)
A pipelining node that writes the pushed items to a named file stream.