24 #ifndef __TPIE_PIPELINING_BUFFER_H__
25 #define __TPIE_PIPELINING_BUFFER_H__
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pipe_base.h>
30 #include <tpie/file_stream.h>
36 namespace pipelining {
49 set_name(
"Fetching items", PRIORITY_SIGNIFICANT);
56 m_queue_ptr = fetch<tpie::maybe<file_stream<T> > *>(
"queue");
57 m_queue = &**m_queue_ptr;
59 forward(
"items", m_queue->size());
63 bool can_pull()
const {
64 return m_queue->can_read();
69 return m_queue->read();
72 virtual void end()
override {
73 (*m_queue_ptr).destruct();
90 set_name(
"Storing items", PRIORITY_INSIGNIFICANT);
101 void push(
const T & item) {
102 m_queue->write(item);
111 std::shared_ptr<node> m_output;
117 template <
typename dest_t>
123 : dest(std::move(dest))
129 set_name(
"Buffer", PRIORITY_INSIGNIFICANT);
135 m_queue_ptr = fetch<tpie::maybe<file_stream<item_type> > *>(
"queue");
136 m_queue = &**m_queue_ptr;
137 forward(
"items", m_queue->size());
144 dest.push(m_queue->
read());
150 m_queue_ptr->destruct();
164 template <
typename T>
213 #endif // __TPIE_PIPELINING_BUFFER_H__
Sequential access is intended.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void propagate() override
Propagate stream metadata.
void end() override
End pipeline processing phase.
bool can_read()
Check if the next call to read() will succeed or not.
const T & read()
Reads next item from stream if can_read() == true.
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
virtual void propagate() override
Propagate stream metadata.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Node factory for variadic argument terminators.
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
void step(stream_size_type steps=1)
Step the progress indicator.
Compress some blocks according to available resources (time, memory).
A pipe_middle class pushes input down the pipeline.
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Plain old file_stream buffer.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
node()
Default constructor, using a new node_token.
virtual void end() override
End pipeline processing phase.
pipe_middle< split_factory< bits::buffer_input_t, node, bits::buffer_output_t > > buffer
The buffer node inserts a phase boundary into the pipeline by writing items to disk.