TPIE

2362a60
serialization.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
23 
24 #ifndef TPIE_PIPELINING_SERIALIZATION_H
25 #define TPIE_PIPELINING_SERIALIZATION_H
26 
27 #include <tpie/pipelining/node.h>
28 #include <tpie/pipelining/factory_helpers.h>
29 #include <tpie/pipelining/pair_factory.h>
30 #include <tpie/pipelining/pipe_base.h>
32 
33 namespace tpie {
34 
35 namespace pipelining {
36 
37 namespace serialization_bits {
38 
39 template <typename dest_t>
40 class input_t : public node {
41  dest_t dest;
43 
44 public:
45  typedef typename push_type<dest_t>::type item_type;
46 
47  input_t(dest_t dest, serialization_reader * rd)
48  : dest(std::move(dest))
49  , rd(rd)
50  {
51  set_name("Serialization reader");
52  add_push_destination(this->dest);
53  set_minimum_memory(rd->memory_usage());
55  }
56 
57  virtual void propagate() override {
58  set_steps(rd->size());
59  }
60 
61  virtual void go() override {
62  item_type x;
63  stream_size_type bytesRead = 0;
64  while (rd->can_read()) {
65  rd->unserialize(x);
66  dest.push(x);
67 
68  stream_size_type bytesRead2 = rd->offset();
69  step(bytesRead2 - bytesRead);
70  bytesRead = bytesRead2;
71  }
72  }
73 };
74 
75 typedef factory<input_t, serialization_reader *> input_factory;
76 
77 
78 template <typename T>
79 class output_t : public node {
81 
82 public:
83  typedef T item_type;
84 
86  : wr(wr)
87  {
88  set_name("Serialization writer");
89  set_minimum_memory(wr->memory_usage());
91  }
92 
93  void push(const T & x) {
94  wr->serialize(x);
95  }
96 };
97 
98 template <typename T>
101 };
102 
103 } // namespace serialization_bits
104 
112 }
113 
118 template <typename T>
119 pipe_end<typename serialization_bits::output_factory<T>::type>
121  return typename serialization_bits::output_factory<T>::type(&wr);
122 }
123 
124 namespace serialization_bits {
125 
126 template <typename T>
127 class reverser_input_t : public node {
128 public:
129  typedef T item_type;
130 
131  reverser_input_t(const node_token & token,
132  std::shared_ptr<node> output=std::shared_ptr<node>())
133  : node(token), output(output)
134  , wr()
135  , items(0)
136  {
137  this->set_name("Serialization reverse writer");
138  //TODO memory
139  set_minimum_resource_usage(FILES, 1);
140  set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
141  }
142 
143  void propagate() override {
144  file.construct();
145  forward<tpie::maybe<tpie::temp_file>*>("__srev_file", &file, 1);
146  }
147 
148  void begin() override {
149  wr.open(file->path());
150  }
151 
152  void push(const item_type & x) {
153  wr.serialize(x);
154  ++items;
155  }
156 
157  void end() override {
158  wr.close();
159  forward<stream_size_type>("items", items);
160  }
161 
162 private:
164  std::shared_ptr<node> output;
166  stream_size_type items;
167 };
168 
169 template <typename dest_t>
170 class reverser_output_t : public node {
171 public:
172  typedef typename push_type<dest_t>::type item_type;
173 
174  reverser_output_t(dest_t dest, const node_token & input_token)
175  : dest(std::move(dest))
176  {
177  set_name("Serialization reverse reader");
178  add_dependency(input_token);
179  add_push_destination(this->dest);
180  //TODO memory
181  set_minimum_resource_usage(FILES, 1);
182  set_plot_options(PLOT_BUFFERED);
183  }
184 
185  void propagate() override {
186  file = fetch<tpie::maybe<tpie::temp_file> *>("__srev_file");
187  if (!file->is_constructed())
188  throw tpie::exception("No one created my file");
189  rd.open((*file)->path());
190  this->set_steps(rd.size());
191  }
192 
193  void go() override {
194  item_type x;
195  stream_size_type bytesRead = 0;
196  while (rd.can_read()) {
197  rd.unserialize(x);
198  dest.push(x);
199 
200  stream_size_type bytesRead2 = rd.offset();
201  step(bytesRead2 - bytesRead);
202  bytesRead = bytesRead2;
203  }
204  }
205 
206  void end() override {
207  rd.close();
208  file->destruct();
209  }
210 private:
213  dest_t dest;
214 };
215 
216 template <typename T>
217 class reverser_pull_output_t : public node {
218 public:
219  typedef T item_type;
220 
221  reverser_pull_output_t(const node_token & input_token)
222  {
223  set_name("Serialization reverse reader");
224  add_dependency(input_token);
225  //TODO memory
226  set_minimum_resource_usage(FILES, 1);
227  set_plot_options(PLOT_BUFFERED);
228  }
229 
230  void propagate() override {
231  file = fetch<tpie::maybe<tpie::temp_file> *>("__srev_file");
232  if (!file->is_constructed())
233  throw tpie::exception("No one created my file");
234  rd.open((*file)->path());
235  this->set_steps(rd.size());
236  }
237 
238  bool can_pull() {
239  return rd.can_read();
240  }
241 
242  T pull() {
243  item_type x;
244  stream_size_type bytesRead = rd.offset();
245  rd.unserialize(x);
246  stream_size_type bytesRead2 = rd.offset();
247  step(bytesRead2 - bytesRead);
248  return x;
249  }
250 
251  void end() override {
252  rd.close();
253  file->destruct();
254  }
255 
256 private:
259 };
260 
261 template <typename T>
262 class buffer_input_t : public node {
263 public:
264  typedef T item_type;
265 
266  buffer_input_t(const node_token & token,
267  std::shared_ptr<node> output = std::shared_ptr<node>())
268  : node(token)
269  , output(output)
270  , wr()
271  , items(0) {
272  set_name("Serialization buffer writer");
273  //TODO memory
274  set_minimum_resource_usage(FILES, 1);
275  set_plot_options(PLOT_BUFFERED | PLOT_SIMPLIFIED_HIDE);
276  }
277 
278  void propagate() override {
279  file.construct();
280  forward<tpie::maybe<tpie::temp_file>*>("__sbuf_file", &file, 1);
281  }
282 
283  void begin() override {
284  wr.open(file->path());
285  }
286 
287  void push(const item_type & x) {
288  wr.serialize(x);
289  ++items;
290  }
291 
292  void end() override {
293  wr.close();
294  this->forward<stream_size_type>("items", items);
295  }
296 public:
297  std::shared_ptr<node> output;
300  stream_size_type items;
301 };
302 
303 
304 template <typename dest_t>
305 class buffer_output_t : public node {
306 public:
307  typedef typename push_type<dest_t>::type item_type;
308 
309  buffer_output_t(dest_t dest, const node_token & input_token)
310  : dest(std::move(dest))
311  {
312  add_dependency(input_token);
313  add_push_destination(this->dest);
314  //TODO MEMORY
315  set_minimum_resource_usage(FILES, 1);
316  set_name("Serialization buffer reader");
317  set_plot_options(PLOT_BUFFERED);
318  }
319 
320  void propagate() override {
321  file = fetch<tpie::maybe<tpie::temp_file> *>("__sbuf_file");
322  if (!file->is_constructed())
323  throw tpie::exception("No one created my file");
324 
325  rd.open((*file)->path());
326  set_steps(rd.size());
327  }
328 
329  void go() override {
330  item_type x;
331  stream_size_type bytesRead = 0;
332  while (rd.can_read()) {
333  rd.unserialize(x);
334  dest.push(x);
335 
336  stream_size_type bytesRead2 = rd.offset();
337  step(bytesRead2 - bytesRead);
338  bytesRead = bytesRead2;
339  }
340  }
341 
342  void end() override {
343  rd.close();
344  file->destruct();
345  }
346 private:
349  dest_t dest;
350 };
351 
352 template <typename T>
353 class buffer_pull_output_t: public node {
354 public:
355  typedef T item_type;
356 
357  buffer_pull_output_t(const node_token & input_token) {
358  add_dependency(input_token);
359  set_name("Fetching items", PRIORITY_SIGNIFICANT);
360  //TODO memory
361  set_minimum_resource_usage(FILES, 1);
362  set_plot_options(PLOT_BUFFERED);
363  }
364 
365  virtual void propagate() override {
366  file = fetch<tpie::maybe<tpie::temp_file> *>("__sbuf_file");
367  if (!file->is_constructed())
368  throw tpie::exception("No one created my file");
369  rd.open((*file)->path());
370  set_steps(rd.size());
371  }
372 
373  bool can_pull() {
374  return rd.can_read();
375  }
376 
377  T pull() {
378  item_type x;
379  stream_size_type bytesRead = rd.offset();
380  rd.unserialize(x);
381  stream_size_type bytesRead2 = rd.offset();
382  step(bytesRead2 - bytesRead);
383  return x;
384  }
385 
386  void end() override {
387  rd.close();
388  file->destruct();
389  }
390 private:
393 };
394 
395 
396 } // namespace serialization_bits
397 
398 
403 template <typename T>
405 public:
406  typedef T item_type;
409 private:
414 public:
416 
417  input_t raw_input() {
418  return input_t(input_token);
419  }
420 
421  output_t raw_output() {
422  return output_t(input_token);
423  }
424 
429  return inputfact_t(input_token);
430  }
431 
436  return outputfact_t(input_token);
437  }
438 private:
439  node_token input_token;
440 
443 };
444 
445 
450 template <typename T>
452 public:
453  typedef T item_type;
456 private:
461 
462 public:
464 
465  input_t raw_input() {
466  return input_t(input_token);
467  }
468 
469  output_t raw_output() {
470  return output_t(input_token);
471  }
472 
473  inputpipe_t input() {
474  return inputfact_t(input_token);
475  }
476 
477  outputpipe_t output() {
478  return outputfact_t(input_token);
479  }
480 
481 private:
482  node_token input_token;
483 
486 };
487 
488 
494 
500 
501 } // namespace pipelining
502 
503 } // namespace tpie
504 
505 #endif // TPIE_PIPELINING_SERIALIZATION_H
virtual void propagate() override
Propagate stream metadata.
void propagate() override
Propagate stream metadata.
void end() override
End pipeline processing phase.
void end() override
End pipeline processing phase.
inputpipe_t input()
Returns a termfactory for the input nodes.
const std::string & path() const
The path of the file opened or the empty string.
stream_size_type offset()
Number of bytes read, not including the header.
Central file abstraction.
Definition: file.h:44
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
A passive serialization reverser stored in external memory.
Stream of serializable items.
void propagate() override
Propagate stream metadata.
Base class of all nodes.
Definition: node.h:78
void end() override
End pipeline processing phase.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
stream_size_type size()
Size of file in bytes, not including the header.
outputpipe_t output()
Returns a termfactory for the output nodes.
void begin() override
Begin pipeline processing phase.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:152
stream_size_type offset()
Number of bytes read, not including the header.
pipe_end< typename serialization_bits::output_factory< T >::type > serialization_output(serialization_writer &wr)
A pipelining node that writes item to a serialization_writer.
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void unserialize(T &v)
Unserialize an unserializable item from the stream.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_resource_usage(resource_type type, memory_size_type usage)
Called by implementers to declare minimum resource requirements.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:207
void propagate() override
Propagate stream metadata.
Node factory for variadic argument terminators.
void end() override
End pipeline processing phase.
virtual void propagate() override
Propagate stream metadata.
Definition: serialization.h:57
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:459
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
void propagate() override
Propagate stream metadata.
Node factory for variadic argument generators.
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
pipe_middle< split_factory< serialization_bits::reverser_input_t, node, serialization_bits::reverser_output_t > > serialization_reverser
A pipelining node that reverses serializable items and creates a phase boundary.
pipe_begin< serialization_bits::input_factory > serialization_input(serialization_reader &rd)
A pipelining node that reads items from a serialization_reader.
void begin() override
Begin pipeline processing phase.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
void propagate() override
Propagate stream metadata.
node()
Default constructor, using a new node_token.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: serialization.h:61
void end() override
End pipeline processing phase.
void end() override
End pipeline processing phase.
pipe_middle< split_factory< serialization_bits::buffer_input_t, node, serialization_bits::buffer_output_t > > serialization_buffer
A pipelining node that acts as a buffer for serializable items and creates a phase boundary...