TPIE

2362a60
ami_glue.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2011, 2012, 2015 The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_AMI_GLUE_H__
21 #define __TPIE_PIPELINING_AMI_GLUE_H__
22 
23 namespace tpie {
24 
25 namespace pipelining {
26 
27 namespace bits {
28 
29 template <typename dest_t>
30 class ami_input_t : public node {
31 public:
32  typedef typename dest_t::item_type item_type;
33  ami_input_t(dest_t && dest, typename tpie::ami::stream<item_type> & stream)
34  : stream(stream), dest(std::move(dest)) {}
35 
36  void propagate() override {
37  forward("items", (stream_size_type)stream.stream_len());
38  set_steps(stream.stream_len());
39  }
40 
41  void begin() override {
42  stream.seek(0);
43  }
44 
45  void go() override {
46  item_type *item;
47  while (stream.read_item(&item) == tpie::ami::NO_ERROR) {
48  dest.push(*item);
49  step(1);
50  }
51  }
52 
53 private:
54 
55  typename tpie::ami::stream<item_type> & stream;
56  dest_t dest;
57 };
58 
59 template <typename dest_t>
60 class ami_input_stack_t : public node {
61 public:
62  typedef typename dest_t::item_type item_type;
63  ami_input_stack_t(dest_t && dest, typename tpie::ami::stack<item_type> & stack)
64  : stack(stack), dest(std::move(dest)) {}
65 
66  void propagate() override {
67  forward("items", (stream_size_type) stack.size());
68  set_steps(stack.size());
69  }
70 
71  void go() override {
72  const item_type *item;
73  while (stack.pop(&item) == tpie::ami::NO_ERROR) {
74  dest.push(*item);
75  step(1);
76  }
77  }
78 
79 private:
80 
81  typename tpie::ami::stack<item_type> & stack;
82  dest_t dest;
83 };
84 
85 template <typename T>
86 class ami_pull_input_stack_t : public node {
87 public:
88  typedef T item_type;
90  : stack(stack) {}
91 
92  void propagate() override {
93  forward("items", (stream_size_type) stack.size());
94  set_steps(stack.size());
95  }
96 
97  bool can_pull() {
98  return !stack.is_empty();
99  }
100 
101  T pull() {
102  const item_type *item;
103  stack.pop(&item);
104  return *item;
105  }
106 
107 private:
108 
109  typename tpie::ami::stack<item_type> & stack;
110 };
111 
117 template <typename T>
118 class ami_output_t : public node {
119 public:
120  typedef T item_type;
121 
122  inline ami_output_t(tpie::ami::stream<T> & stream) : stream(stream) {
123  set_name("Write", PRIORITY_INSIGNIFICANT);
124  }
125 
126  inline void push(const T & item) {
127  stream.write_item(item);
128  }
129 private:
130  tpie::ami::stream<T> & stream;
131 };
132 
133 } // namespace bits
134 
140 template<typename T>
144 }
145 
151 template<typename T>
152 inline pipe_begin<factory<bits::ami_input_stack_t, tpie::ami::stack<T> &> >
155 }
156 
161 template<typename T>
162 inline pullpipe_begin<termfactory<bits::pull_input_t<T>, tpie::ami::stack<T> &> >
165 }
166 
171 template <typename T>
174 }
175 
176 } // namespace pipelining
177 
178 } // namespace tpie
179 
180 #endif
file_stream output terminator.
Definition: ami_glue.h:118
void begin() override
Begin pipeline processing phase.
Definition: ami_glue.h:41
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
Definition: file_stream.h:379
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: ami_glue.h:71
void propagate() override
Propagate stream metadata.
Definition: ami_glue.h:92
No error occurred.
Definition: err.h:47
An implementation of an external-memory stack.
Definition: stack.h:40
TPIE_OS_OFFSET size() const
Returns the number of items currently on the stack.
Definition: stack.h:237
pullpipe_begin< termfactory< bits::pull_input_t< T >, tpie::ami::stack< T > & > > ami_pull_input_stack(tpie::ami::stack< T > &fs)
A pipelining pull-node that reads items from the given ami::stack.
Definition: ami_glue.h:163
Base class of all nodes.
Definition: node.h:78
pipe_begin< factory< bits::ami_input_t, tpie::ami::stream< T > & > > ami_input(tpie::ami::stream< T > &input)
Pipelining nodes that pushes the contents of the given ami::stream to the next node in the pipeline...
Definition: ami_glue.h:142
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:564
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
Node factory for variadic argument terminators.
pipe_begin< factory< bits::ami_input_stack_t, tpie::ami::stack< T > & > > ami_input_stack(tpie::ami::stack< T > &input)
Pipelining nodes that pushes the contents of the given ami::stack to the next node in the pipeline...
Definition: ami_glue.h:153
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
pipe_end< termfactory< bits::ami_output_t< T >, tpie::ami::stream< T > & > > ami_output(tpie::ami::stream< T > &fs)
A pipelining node that writes the pushed items to an ami stream.
Definition: ami_glue.h:172
bool is_empty() const
Returns whether the stack is empty or not.
Definition: stack.h:244
err pop(const T **t)
Pops one item from the stack.
Definition: stack.h:325
Node factory for variadic argument generators.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
void propagate() override
Propagate stream metadata.
Definition: ami_glue.h:36
void propagate() override
Propagate stream metadata.
Definition: ami_glue.h:66
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: ami_glue.h:45