TPIE

2362a60
serialization_sort.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H
21 #define TPIE_PIPELINING_SERIALIZATION_SORT_H
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/serialization_sorter.h>
27 #include <tpie/serialization.h>
28 
29 namespace tpie {
30 
31 namespace pipelining {
32 
33 namespace serialization_bits {
34 
35 template <typename T, typename pred_t>
37 public:
38  typedef T item_type;
39  typedef pred_t pred_type;
41  typedef std::shared_ptr<sorter_t> sorterptr;
42 };
43 
44 template <typename Traits>
46 
47 template <typename Traits>
49 
50 template <typename Traits>
51 class sort_output_base : public node {
52  typedef typename Traits::pred_type pred_type;
53 public:
55  typedef typename Traits::item_type item_type;
57  typedef typename Traits::sorter_t sorter_t;
59  typedef typename Traits::sorterptr sorterptr;
60 
61  sorterptr get_sorter() const {
62  return m_sorter;
63  }
64 
65  void set_calc_node(node & calc) {
67  }
68 
69  virtual void propagate() override {
70  set_steps(m_sorter->item_count());
71  forward("items", static_cast<stream_size_type>(m_sorter->item_count()));
72  memory_size_type memory_usage = m_sorter->actual_memory_phase_3();
73  set_minimum_memory(memory_usage);
74  set_maximum_memory(memory_usage);
76  m_propagate_called = true;
77  }
78 
79  void begin() override {
80  m_sorter->set_owner(this);
81  }
82 
83  void end() override {
84  m_sorter->set_owner(nullptr);
85  m_sorter.reset();
86  }
87 
88  void add_calc_dependency(node_token tkn) {
90  }
91 
92 protected:
93  virtual void set_available_memory(memory_size_type availableMemory) override {
94  if (!m_propagate_called)
95  m_sorter->set_phase_3_memory(availableMemory);
96  }
97 
98  sort_output_base(pred_type pred)
99  : m_sorter(new sorter_t(sizeof(item_type), pred))
100  , m_propagate_called(false)
101  {
102  }
103 
104  sort_output_base(sorterptr p)
105  : m_sorter(p)
106  , m_propagate_called(false)
107  {
108  }
109 
110  sorterptr m_sorter;
111  bool m_propagate_called;
112 };
113 
117 template <typename Traits>
118 class sort_pull_output_t : public sort_output_base<Traits> {
119 public:
120  typedef typename Traits::item_type item_type;
121  typedef typename Traits::pred_type pred_type;
122  typedef typename Traits::sorter_t sorter_t;
123  typedef typename Traits::sorterptr sorterptr;
124 
126  : sort_output_base<Traits>(sorter)
127  {
128  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
129  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
130  this->set_memory_fraction(1.0);
131  }
132 
133  inline bool can_pull() const {
134  return this->m_sorter->can_pull();
135  }
136 
137  inline item_type pull() {
138  this->step();
139  return this->m_sorter->pull();
140  }
141 
142  // Despite this go() implementation, a sort_pull_output_t CANNOT be used as
143  // an initiator node. Normally, it is a type error to have a phase without
144  // an initiator, but with a passive_sorter you can circumvent this
145  // mechanism. Thus we customize the error message printed (but throw the
146  // same type of exception.)
147  virtual void go() override {
148  log_warning() << "Passive sorter used without an initiator in the final merge and output phase.\n"
149  << "Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
150  throw not_initiator_node();
151  }
152 };
153 
157 template <typename Traits, typename dest_t>
158 class sort_output_t : public sort_output_base<Traits> {
159  typedef typename Traits::pred_type pred_type;
160 public:
161  typedef typename Traits::item_type item_type;
163  typedef typename Traits::sorter_t sorter_t;
164  typedef typename Traits::sorterptr sorterptr;
165 
166  sort_output_t(dest_t dest, pred_type pred)
167  : p_t(pred)
168  , dest(std::move(dest))
169  {
170  this->add_push_destination(dest);
171  this->set_minimum_memory(sorter_t::minimum_memory_phase_3());
172  this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT);
173  this->set_memory_fraction(1.0);
174  }
175 
176  virtual void go() override {
177  while (this->m_sorter->can_pull()) {
178  dest.push(this->m_sorter->pull());
179  this->step();
180  }
181  }
182 private:
183  dest_t dest;
184 };
185 
191 template <typename Traits>
192 class sort_calc_t : public node {
193 public:
194  typedef typename Traits::item_type item_type;
195  typedef typename Traits::sorter_t sorter_t;
196  typedef typename Traits::sorterptr sorterptr;
197 
198  typedef sort_output_base<Traits> Output;
199 
200  sort_calc_t(sort_calc_t && other) = default;
201 
202  template <typename dest_t>
203  sort_calc_t(dest_t dest)
204  : dest(new dest_t(std::move(dest)))
205  {
206  m_sorter = this->dest->get_sorter();
207  this->dest->add_calc_dependency(this->get_token());
208  init();
209  }
210 
211  sort_calc_t(sorterptr sorter, node_token tkn)
212  : node(tkn), m_sorter(sorter)
213  {
214  init();
215  }
216 
217  void init() {
218  set_minimum_memory(sorter_t::minimum_memory_phase_2());
219  set_name("Perform merge heap", PRIORITY_SIGNIFICANT);
220  set_memory_fraction(1.0);
221  m_propagate_called = false;
222  }
223 
224  virtual void propagate() override {
225  set_steps(1000);
226  m_propagate_called = true;
227  }
228 
229  void begin() override {
230  m_sorter->set_owner(this);
231  }
232 
233  virtual bool is_go_free() const override {return m_sorter->is_merge_runs_free();}
234 
235  virtual void go() override {
237  log_debug() << "TODO: Progress information during merging." << std::endl;
238  m_sorter->merge_runs();
239  pi->init(1);
240  pi->step();
241  pi->done();
242  }
243 
244  void end() override {
245  m_weak_sorter = m_sorter;
246  m_sorter.reset();
247  }
248 
249  virtual bool can_evacuate() override {
250  return true;
251  }
252 
253  virtual void evacuate() override {
254  auto p = m_weak_sorter.lock();
255  if (p) p->evacuate();
256  }
257 
258  sorterptr get_sorter() const {
259  return m_sorter;
260  }
261 
262  void set_input_node(node & input) {
264  }
265 
266 protected:
267  virtual void set_available_memory(memory_size_type availableMemory) override {
268  if (!m_propagate_called)
269  m_sorter->set_phase_2_memory(availableMemory);
270  }
271 
272 private:
273  sorterptr m_sorter;
274  std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
275  bool m_propagate_called;
276  std::shared_ptr<Output> dest;
277 };
278 
284 template <typename Traits>
285 class sort_input_t : public node {
286  typedef typename Traits::pred_type pred_type;
287 public:
288  typedef typename Traits::item_type item_type;
289  typedef typename Traits::sorter_t sorter_t;
290  typedef typename Traits::sorterptr sorterptr;
291 
292  sort_input_t(sort_calc_t<Traits> dest)
293  : m_sorter(dest.get_sorter())
294  , dest(std::move(dest))
295  {
296  this->dest.set_input_node(*this);
297  set_minimum_memory(sorter_t::minimum_memory_phase_1());
298  set_name("Form input runs", PRIORITY_SIGNIFICANT);
299  set_memory_fraction(1.0);
300  m_propagate_called = false;
301  }
302 
303  virtual void propagate() override {
304  m_propagate_called = true;
305  }
306 
307  virtual void begin() override {
308  m_sorter->set_owner(this);
309  m_sorter->begin();
310  }
311 
312  void push(const item_type & item) {
313  m_sorter->push(item);
314  }
315 
316  virtual void end() override {
317  m_sorter->end();
318  m_weak_sorter = m_sorter;
319  m_sorter.reset();
320  }
321 
322  virtual bool can_evacuate() override {
323  return true;
324  }
325 
326  virtual void evacuate() override {
327  auto p = m_weak_sorter.lock();
328  if (p) p->evacuate();
329  }
330 
331 protected:
332  virtual void set_available_memory(memory_size_type availableMemory) override {
333  if (!m_propagate_called)
334  m_sorter->set_phase_1_memory(availableMemory);
335  }
336 
337 private:
338  sorterptr m_sorter;
339  std::weak_ptr<typename sorterptr::element_type> m_weak_sorter;
340  sort_calc_t<Traits> dest;
341  bool m_propagate_called;
342 };
343 
344 template <typename child_t>
346  const child_t & self() const { return *static_cast<const child_t *>(this); }
347 public:
348  template <typename dest_t>
349  struct constructed {
350  private:
352  typedef typename push_type<dest_t>::type item_type;
353  public:
354  typedef typename child_t::template predicate<item_type>::type pred_type;
356  typedef sort_input_t<Traits> type;
357  };
358 
359  template <typename dest_t>
360  typename constructed<dest_t>::type construct(dest_t dest) {
361  typedef typename push_type<dest_t>::type item_type;
362  typedef typename constructed<dest_t>::Traits Traits;
363 
364  sort_output_t<Traits, dest_t> output(std::move(dest), self().template get_pred<item_type>());
365  this->init_sub_node(output);
366  sort_calc_t<Traits> calc(std::move(output));
367  this->init_sub_node(calc);
368  sort_input_t<Traits> input(std::move(calc));
369  this->init_sub_node(input);
370 
371  return input;
372  }
373 };
374 
378 class default_pred_sort_factory : public sort_factory_base<default_pred_sort_factory> {
379 public:
380  template <typename item_type>
381  class predicate {
382  public:
383  typedef std::less<item_type> type;
384  };
385 
386  template <typename T>
387  std::less<T> get_pred() const {
388  return std::less<T>();
389  }
390 };
391 
395 template <typename pred_t>
396 class sort_factory : public sort_factory_base<sort_factory<pred_t> > {
397 public:
398  template <typename Dummy>
399  class predicate {
400  public:
401  typedef pred_t type;
402  };
403 
404  sort_factory(const pred_t & p)
405  : pred(p)
406  {
407  }
408 
409  template <typename T>
410  pred_t get_pred() const {
411  return pred;
412  }
413 
414 private:
415  pred_t pred;
416 };
417 
418 } // namespace serialization_bits
419 
423 inline pipe_middle<serialization_bits::default_pred_sort_factory>
426  return pipe_middle<fact>(fact()).name("Sort");
427 }
428 
432 template <typename pred_t>
433 pipe_middle<serialization_bits::sort_factory<pred_t> >
434 serialization_sort(const pred_t & p) {
436  return pipe_middle<fact>(fact(p)).name("Sort");
437 }
438 
439 template <typename T, typename pred_t=std::less<T> >
441 
442 namespace serialization_bits {
443 
447 template <typename Traits>
449 public:
451  typedef sort_calc_t<Traits> calc_t;
453  typedef input_t constructed_type;
454  typedef typename Traits::sorter_t sorter_t;
455  typedef typename Traits::sorterptr sorterptr;
456 
457  passive_sorter_factory_input(sorterptr sorter, node_token calc_token)
458  : m_sorter(sorter)
459  , m_calc_token(calc_token) {}
460 
461  constructed_type construct() {
462  calc_t calc(std::move(m_sorter), m_calc_token);
463  this->init_node(calc);
464  input_t input(std::move(calc));
465  this->init_node(input);
466  return std::move(input);
467  }
468 
469 private:
470  sorterptr m_sorter;
471  node_token m_calc_token;
472 };
473 
477 template <typename Traits>
479 public:
481  typedef typename Traits::sorterptr sorterptr;
482  typedef output_t constructed_type;
483 
484  passive_sorter_factory_output(sorterptr sorter, node_token calc_token)
485  : m_sorter(sorter)
486  , m_calc_token(calc_token)
487  {}
488 
489  constructed_type construct() {
490  constructed_type res(std::move(m_sorter));
491  res.add_calc_dependency(m_calc_token);
492  init_node(res);
493  return std::move(res);
494  }
495 
496 private:
497  sorterptr m_sorter;
498  node_token m_calc_token;
499 };
500 
501 } // namespace serialization_bits
502 
511 template <typename T, typename pred_t>
514 public:
516  typedef T item_type;
518  typedef typename Traits::sorter_t sorter_t;
520  typedef typename Traits::sorterptr sorterptr;
523 
526 
527  serialization_passive_sorter(pred_t pred = pred_t())
528  : m_sorter_input(new sorter_t(sizeof(T), pred))
529  , m_sorter_output(m_sorter_input)
530  {
531  }
532 
534  serialization_passive_sorter & operator=(const serialization_passive_sorter &) = delete;
537 
542  assert(m_sorter_input);
543  return input_pipe_t(std::move(m_sorter_input), m_calc_token);
544  }
545 
550  assert(m_sorter_output);
551  return output_pipe_t(std::move(m_sorter_output), m_calc_token);
552  }
553 
554 private:
555  sorterptr m_sorter_input;
556  sorterptr m_sorter_output;
557  node_token m_calc_token;
558 };
559 
560 } // namespace pipelining
561 
562 } // namespace tpie
563 
564 #endif // TPIE_PIPELINING_SERIALIZATION_SORT_H
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline...
Definition: file_stream.h:379
void end() override
End pipeline processing phase.
output_pipe_t output()
Get the output pull node.
The base class for indicating the progress of some task.
void add_memory_share_dependency(const node_token &dest)
Called by implementers to declare a node memory share dependency, that is, a requirement that another...
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
input_pipe_t input()
Get the input push node.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
Base class of all pipelining factories.
Definition: factory_base.h:73
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
void init_node(node &r)
Initialize node constructed in a subclass.
Definition: factory_base.h:134
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:632
virtual void done()
Advance the indicator to the end.
serialization_bits::sort_pull_output_t< Traits > output_t
Type of pipe sorter output.
Base class of all nodes.
Definition: node.h:78
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:152
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Pipelined sorter with push input and pull output.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
Binary serialization and unserialization.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:564
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Definition: node.h:217
virtual void propagate() override
Propagate stream metadata.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
virtual void propagate() override
Propagate stream metadata.
void begin() override
Begin pipeline processing phase.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:207
pipe_middle< serialization_bits::default_pred_sort_factory > serialization_sort()
Pipelining sorter using std::less.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
void init_sub_node(node &r)
Initialize node constructed in a subclass.
Definition: factory_base.h:157
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
pipe_middle< tempfactory< bits::item_type_t< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
void begin() override
Begin pipeline processing phase.
virtual void propagate() override
Propagate stream metadata.
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:654
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:225
virtual void begin() override
Begin pipeline processing phase.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
pipe_end< termfactory< bits::output_t< T >, file_stream< T > & > > output(file_stream< T > &fs)
A pipelining node that writes the pushed items to a file stream.
Definition: file_stream.h:431
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
void end() override
End pipeline processing phase.
virtual void end() override
End pipeline processing phase.
node()
Default constructor, using a new node_token.
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:157
Sort factory using the given predicate as comparator.
Traits::item_type item_type
Type of items sorted.
virtual void init(stream_size_type range=0)
Initialize progress indicator.