TPIE

2362a60
serialization_sorter.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_SERIALIZATION_SORTER_H
21 #define TPIE_SERIALIZATION_SORTER_H
22 
23 #include <queue>
24 #include <boost/filesystem.hpp>
25 
26 #include <tpie/array.h>
27 #include <tpie/array_view.h>
28 #include <tpie/tempname.h>
29 #include <tpie/tpie_log.h>
30 #include <tpie/stats.h>
31 #include <tpie/parallel_sort.h>
32 
33 #include <tpie/serialization2.h>
35 
36 #include <tpie/pipelining/node.h>
37 
38 namespace tpie {
39 
40 namespace serialization_bits {
41 
44  memory_size_type filesPhase1;
46  memory_size_type memoryPhase1;
48  memory_size_type filesPhase2;
50  memory_size_type memoryPhase2;
52  memory_size_type filesPhase3;
54  memory_size_type memoryPhase3;
56  memory_size_type minimumItemSize;
58  std::string tempDir;
59 
60  void dump(std::ostream & out) const {
61  out << "Serialization merge sort parameters\n"
62  << "Phase 1 files: " << filesPhase1 << '\n'
63  << "Phase 1 memory: " << memoryPhase1 << '\n'
64  << "Phase 2 files: " << filesPhase2 << '\n'
65  << "Phase 2 memory: " << memoryPhase2 << '\n'
66  << "Phase 3 files: " << filesPhase3 << '\n'
67  << "Phase 3 memory: " << memoryPhase3 << '\n'
68  << "Minimum item size: " << minimumItemSize << '\n'
69  << "Temporary directory: " << tempDir << '\n';
70  }
71 };
72 
73 template <typename T>
74 void set_owner(memory_bucket_ref b, T & item) {
75  memory_size_type serSize = serialized_size(item);
76 
77  if (serSize > sizeof(T)) {
78  // amount of memory this item needs for its extra stuff (stuff not in the buffer).
79  serSize -= sizeof(T);
80  }
81 
82  b->count += serSize;
83 }
84 
85 template <typename T>
86 void unset_owner(memory_bucket_ref /*b*/, T & /*item*/) {}
87 
88 template <typename T, typename pred_t>
90  array<T> m_buffer;
91  memory_size_type m_items;
92  memory_size_type m_memForItems;
93 
94  memory_size_type m_largestItem;
95 
96  pred_t m_pred;
97 
98  bool m_full;
99 
100  memory_bucket_ref m_buffer_bucket;
101  memory_bucket_ref m_item_bucket;
102 
103 public:
104  internal_sort(memory_bucket_ref buffer_bucket,
105  memory_bucket_ref item_bucket,
106  pred_t pred = pred_t())
107  : m_buffer(buffer_bucket)
108  , m_items(0)
109  , m_largestItem(sizeof(T))
110  , m_pred(pred)
111  , m_full(false)
112  , m_buffer_bucket(buffer_bucket)
113  , m_item_bucket(item_bucket)
114  {
115  }
116 
117  void begin(memory_size_type memAvail) {
118  m_buffer.resize(memAvail / sizeof(T) / 2);
119  m_items = 0;
120  m_largestItem = sizeof(T);
121  m_full = false;
122  m_memForItems = memAvail - m_buffer_bucket->count;
123  }
124 
131  bool push(const T & item) {
132  if (m_full) return false;
133 
134  if (m_items == m_buffer.size()) {
135  m_full = true;
136  return false;
137  }
138 
139  size_t oldSize = m_item_bucket->count;
140  set_owner(m_item_bucket, item);
141 
142  if (m_item_bucket->count > m_memForItems) {
143  unset_owner(m_item_bucket, item);
144  m_full = true;
145  return false;
146  }
147 
148  m_largestItem = std::max(m_largestItem, m_item_bucket->count - oldSize);
149 
150  m_buffer[m_items++] = item;
151 
152  return true;
153  }
154 
155  memory_size_type get_largest_item_size() {
156  return m_largestItem;
157  }
158 
165  memory_size_type current_serialized_size() {
166  return m_item_bucket->count;
167  }
168 
179  memory_size_type memory_usage() {
180  return m_buffer_bucket->count + m_item_bucket->count;
181  }
182 
183  bool can_shrink_buffer() {
185  }
186 
187  void shrink_buffer() {
188  array<T> newBuffer(array_view<const T>(begin(), end()));
189  m_buffer.swap(newBuffer);
190  }
191 
192  void sort() {
193  parallel_sort(m_buffer.get(), m_buffer.get() + m_items, m_pred);
194  }
195 
196  const T * begin() const {
197  return m_buffer.get();
198  }
199 
200  const T * end() const {
201  return m_buffer.get() + m_items;
202  }
203 
207  void free() {
208  reset();
209  m_buffer.resize(0);
210  }
211 
216  void reset() {
217  for (size_t i = 0 ; i < m_items ; ++i)
218  unset_owner(m_item_bucket, m_buffer[i]);
219  m_item_bucket->count = 0;
220  m_items = 0;
221  m_full = false;
222  }
223 };
224 
264 template <typename T>
266  // Physical index of the run file with logical index 0.
267  size_t m_fileOffset;
268  // Physical index of the run file that begins the next run.
269  size_t m_nextLevelFileOffset;
270  // Physical index of the next run file to write
271  size_t m_nextFileOffset;
272 
273  bool m_writerOpen;
274  size_t m_readersOpen;
275 
276  serialization_writer m_writer;
277  stream_size_type m_currentWriterByteSize;
278 
279  array<serialization_reader> m_readers;
280 
281  std::string m_tempDir;
282 
283  std::string run_file(size_t physicalIndex) {
284  if (m_tempDir.size() == 0) throw exception("run_file: temp dir is the empty string");
285  std::stringstream ss;
286  ss << m_tempDir << '/' << physicalIndex << ".tpie";
287  return ss.str();
288  }
289 
290 public:
291  file_handler()
292  : m_fileOffset(0)
293  , m_nextLevelFileOffset(0)
294  , m_nextFileOffset(0)
295 
296  , m_writerOpen(false)
297  , m_readersOpen(0)
298 
299  , m_writer()
300  , m_currentWriterByteSize(0)
301  {
302  }
303 
304  ~file_handler() {
305  reset();
306  }
307 
308  void set_temp_dir(const std::string & tempDir) {
309  if (m_nextFileOffset != 0)
310  throw exception("set_temp_dir: trying to change path after files already open");
311  m_tempDir = tempDir;
312  }
313 
314  void open_new_writer() {
315  if (m_writerOpen) throw exception("open_new_writer: Writer already open");
316  m_writer.open(run_file(m_nextFileOffset++));
317  m_currentWriterByteSize = m_writer.file_size();
318  m_writerOpen = true;
319  }
320 
321  void write(const T & v) {
322  if (!m_writerOpen) throw exception("write: No writer open");
323  m_writer.serialize(v);
324  }
325 
326  void close_writer() {
327  if (!m_writerOpen) throw exception("close_writer: No writer open");
328  m_writer.close();
329  stream_size_type sz = m_writer.file_size();
330  increase_usage(m_nextFileOffset-1, static_cast<stream_offset_type>(sz));
331  m_writerOpen = false;
332  }
333 
334  size_t remaining_runs() {
335  return m_nextLevelFileOffset - m_fileOffset;
336  }
337 
338  size_t next_level_runs() {
339  return m_nextFileOffset - m_nextLevelFileOffset;
340  }
341 
342  bool readers_open() {
343  return m_readersOpen > 0;
344  }
345 
346  void open_readers(size_t fanout) {
347  if (m_readersOpen != 0) throw exception("open_readers: readers already open");
348  if (fanout == 0) throw exception("open_readers: fanout == 0");
349  if (remaining_runs() == 0) {
350  if (m_writerOpen) throw exception("Writer open while moving to next merge level");
351  m_nextLevelFileOffset = m_nextFileOffset;
352  }
353  if (fanout > remaining_runs()) throw exception("open_readers: fanout out of bounds");
354 
355  if (m_readers.size() < fanout) m_readers.resize(fanout);
356  for (size_t i = 0; i < fanout; ++i) {
357  m_readers[i].open(run_file(m_fileOffset + i));
358  }
359  m_readersOpen = fanout;
360  }
361 
362  bool can_read(size_t idx) {
363  if (m_readersOpen == 0) throw exception("can_read: no readers open");
364  if (m_readersOpen < idx) throw exception("can_read: index out of bounds");
365  return m_readers[idx].can_read();
366  }
367 
368  T read(size_t idx) {
369  if (m_readersOpen == 0) throw exception("read: no readers open");
370  if (m_readersOpen < idx) throw exception("read: index out of bounds");
371  T res;
372  m_readers[idx].unserialize(res);
373  return res;
374  }
375 
376  void close_readers_and_delete() {
377  if (m_readersOpen == 0) throw exception("close_readers_and_delete: no readers open");
378 
379  for (size_t i = 0; i < m_readersOpen; ++i) {
380  decrease_usage(m_fileOffset + i, m_readers[i].file_size());
381  m_readers[i].close();
382  boost::filesystem::remove(run_file(m_fileOffset + i));
383  }
384  m_fileOffset += m_readersOpen;
385  m_readersOpen = 0;
386  }
387 
388  void move_last_reader_to_next_level() {
389  if (remaining_runs() != 1)
390  throw exception("move_last_reader_to_next_level: remaining_runs != 1");
391  m_nextLevelFileOffset = m_fileOffset;
392  }
393 
394  void reset() {
395  if (m_readersOpen > 0) {
396  log_debug() << "reset: Close readers" << std::endl;
397  close_readers_and_delete();
398  }
399  m_readers.resize(0);
400  if (m_writerOpen) {
401  log_debug() << "reset: Close writer" << std::endl;
402  close_writer();
403  }
404  log_debug() << "Remove " << m_fileOffset << " through " << m_nextFileOffset << std::endl;
405  for (size_t i = m_fileOffset; i < m_nextFileOffset; ++i) {
406  std::string runFile = run_file(i);
408  rd.open(runFile);
409  decrease_usage(i, rd.file_size());
410  rd.close();
411  boost::filesystem::remove(runFile);
412  }
413  m_fileOffset = m_nextLevelFileOffset = m_nextFileOffset = 0;
414  }
415 
416 private:
417  void increase_usage(size_t idx, stream_size_type sz) {
418  log_debug() << "+ " << idx << ' ' << sz << std::endl;
419  increment_temp_file_usage(static_cast<stream_offset_type>(sz));
420  }
421 
422  void decrease_usage(size_t idx, stream_size_type sz) {
423  log_debug() << "- " << idx << ' ' << sz << std::endl;
424  increment_temp_file_usage(-static_cast<stream_offset_type>(sz));
425  }
426 };
427 
428 template <typename T, typename pred_t>
429 class merger {
430  class mergepred_t {
431  pred_t m_pred;
432 
433  public:
434  typedef std::pair<T, size_t> item_type;
435 
436  mergepred_t(const pred_t & pred) : m_pred(pred) {}
437 
438  // Used with std::priority_queue, so invert the original relation.
439  bool operator()(const item_type & a, const item_type & b) const {
440  return m_pred(b.first, a.first);
441  }
442  };
443 
444  typedef typename mergepred_t::item_type item_type;
445 
446  file_handler<T> & files;
447  pred_t pred;
448  std::vector<serialization_reader> rd;
449  typedef std::priority_queue<item_type, std::vector<item_type>, mergepred_t> priority_queue_type;
450  priority_queue_type pq;
451 
452 public:
453  merger(file_handler<T> & files, const pred_t & pred)
454  : files(files)
455  , pred(pred)
456  , pq(mergepred_t(pred))
457  {
458  }
459 
460  // Assume files.open_readers(fanout) has just been called
461  void init(size_t fanout) {
462  rd.resize(fanout);
463  for (size_t i = 0; i < fanout; ++i)
464  push_from(i);
465  }
466 
467  bool empty() const {
468  return pq.empty();
469  }
470 
471  const T & top() const {
472  return pq.top().first;
473  }
474 
475  void pop() {
476  size_t idx = pq.top().second;
477  pq.pop();
478  push_from(idx);
479  }
480 
481  // files.close_readers_and_delete() should be called after this
482  void free() {
483  {
484  priority_queue_type tmp(pred);
485  std::swap(pq, tmp);
486  }
487  rd.resize(0);
488  }
489 
490 private:
491  void push_from(size_t idx) {
492  if (files.can_read(idx)) {
493  pq.push(std::make_pair(files.read(idx), idx));
494  }
495  }
496 };
497 
498 } // namespace serialization_bits
499 
500 template <typename T, typename pred_t = std::less<T> >
502 public:
503  typedef std::shared_ptr<serialization_sorter> ptr;
504 
505 private:
506  enum sorter_state { state_initial, state_1, state_2, state_3 };
507 
508  std::unique_ptr<memory_bucket> m_buffer_bucket_ptr;
509  memory_bucket_ref m_buffer_bucket;
510  std::unique_ptr<memory_bucket> m_item_bucket_ptr;
511  memory_bucket_ref m_item_bucket;
512  pipelining::node * m_owning_node;
513 
514  sorter_state m_state;
517  bool m_parametersSet;
520 
521  stream_size_type m_items;
522  bool m_reportInternal;
523  const T * m_nextInternalItem;
524 
525  static const memory_size_type defaultFiles = 253; // Default number of files available, when not using set_available_files
526  static const memory_size_type minimumFilesPhase1 = 1;
527  static const memory_size_type maximumFilesPhase1 = 1;
528  static const memory_size_type minimumFilesPhase2 = 3;
529  static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max();
530  static const memory_size_type minimumFilesPhase3 = 3;
531  static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max();
532 
533  const int defaultMaxFiles = 253;
534 
535 public:
536  serialization_sorter(memory_size_type minimumItemSize = sizeof(T), pred_t pred = pred_t())
537  : m_buffer_bucket_ptr(new memory_bucket())
538  , m_buffer_bucket(memory_bucket_ref(m_buffer_bucket_ptr.get()))
539  , m_item_bucket_ptr(new memory_bucket())
540  , m_item_bucket(memory_bucket_ref(m_item_bucket_ptr.get()))
541  , m_owning_node(nullptr)
542  , m_state(state_initial)
543  , m_sorter(m_buffer_bucket, m_item_bucket, pred)
544  , m_parametersSet(false)
545  , m_files()
546  , m_merger(m_files, pred)
547  , m_items(0)
548  , m_reportInternal(false)
549  , m_nextInternalItem(0)
550  {
551  m_params.filesPhase1 = 0;
552  m_params.filesPhase2 = 0;
553  m_params.filesPhase3 = 0;
554  m_params.memoryPhase1 = 0;
555  m_params.memoryPhase2 = 0;
556  m_params.memoryPhase3 = 0;
557  m_params.minimumItemSize = minimumItemSize;
558  }
559 
560 private:
561  // Checks if we should still be able to change parameters
562  inline void check_not_started() {
563  if (m_state != state_initial) {
564  throw tpie::exception("Can't change parameters after sorting has started");
565  }
566  }
567 
568 public:
569  inline void set_phase_1_files(memory_size_type f1) {
570  m_params.filesPhase1 = f1;
571  check_not_started();
572  }
573 
574  inline void set_phase_2_files(memory_size_type f2) {
575  m_params.filesPhase2 = f2;
576  check_not_started();
577  }
578 
579  inline void set_phase_3_files(memory_size_type f3) {
580  m_params.filesPhase3 = f3;
581  check_not_started();
582  }
583 
588  inline void set_available_files(memory_size_type f) {
589  m_params.filesPhase1 = m_params.filesPhase2 = m_params.filesPhase3 = f;
590  check_not_started();
591  }
592 
599  inline void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3) {
600  m_params.filesPhase1 = f1;
601  m_params.filesPhase2 = f2;
602  m_params.filesPhase3 = f3;
603  check_not_started();
604  }
605 
606  void set_phase_1_memory(memory_size_type m1) {
607  m_params.memoryPhase1 = m1;
608  check_not_started();
609  }
610 
611  void set_phase_2_memory(memory_size_type m2) {
612  m_params.memoryPhase2 = m2;
613  check_not_started();
614  }
615 
616  void set_phase_3_memory(memory_size_type m3) {
617  m_params.memoryPhase3 = m3;
618  check_not_started();
619  }
620 
621  void set_available_memory(memory_size_type m) {
622  set_phase_1_memory(m);
623  set_phase_2_memory(m);
624  set_phase_3_memory(m);
625  }
626 
627  void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
628  set_phase_1_memory(m1);
629  set_phase_2_memory(m2);
630  set_phase_3_memory(m3);
631  }
632 
633  static memory_size_type minimum_memory_phase_1() {
634  return serialization_writer::memory_usage()*2;
635  }
636 
637  static memory_size_type minimum_memory_phase_2() {
638  return serialization_writer::memory_usage()
639  + 2*serialization_reader::memory_usage();
640  }
641 
642  static memory_size_type minimum_memory_phase_3() {
643  return 2*serialization_reader::memory_usage();
644  }
645 
646  memory_size_type actual_memory_phase_3() {
647  if (m_state != state_3)
648  throw tpie::exception("Bad state in actualy_memory_phase_3");
649  if (m_reportInternal)
650  return m_sorter.memory_usage();
651  else
652  return m_files.next_level_runs() * (m_sorter.get_largest_item_size() + serialization_reader::memory_usage());
653  }
654 
655  void set_owner(pipelining::node * n) {
656  if (m_owning_node != nullptr) {
657  m_buffer_bucket_ptr = std::move(m_owning_node->bucket(0));
658  m_item_bucket_ptr = std::move(m_owning_node->bucket(1));
659  }
660 
661  if (n != nullptr) {
662  n->bucket(0) = std::move(m_buffer_bucket_ptr);
663  n->bucket(1) = std::move(m_item_bucket_ptr);
664  }
665 
666  m_owning_node = n;
667  }
668 private:
669  static memory_size_type clamp(memory_size_type lo, memory_size_type val, memory_size_type hi) {
670  return std::max(lo, std::min(val, hi));
671  }
672 
673  void calculate_parameters() {
674  if (m_state != state_initial)
675  throw tpie::exception("Bad state in calculate_parameters");
676 
677  if(!m_params.filesPhase1)
678  m_params.filesPhase1 = clamp(minimumFilesPhase1, defaultFiles, maximumFilesPhase1);
679  if(!m_params.filesPhase2)
680  m_params.filesPhase2 = clamp(minimumFilesPhase2, defaultFiles, maximumFilesPhase2);
681  if(!m_params.filesPhase3)
682  m_params.filesPhase3 = clamp(minimumFilesPhase3, defaultFiles, maximumFilesPhase3);
683 
684  if(m_params.filesPhase1 < minimumFilesPhase1)
685  throw tpie::exception("file limit for phase 1 too small (" + std::to_string(m_params.filesPhase1) + " < " + std::to_string(minimumFilesPhase1) + ")");
686  if(m_params.filesPhase2 < minimumFilesPhase2)
687  throw tpie::exception("file limit for phase 2 too small (" + std::to_string(m_params.filesPhase2) + " < " + std::to_string(minimumFilesPhase2) + ")");
688  if(m_params.filesPhase3 < minimumFilesPhase3)
689  throw tpie::exception("file limit for phase 3 too small (" + std::to_string(m_params.filesPhase3) + " < " + std::to_string(minimumFilesPhase3) + ")");
690 
691  memory_size_type memAvail1 = m_params.memoryPhase1;
692  if (memAvail1 <= serialization_writer::memory_usage()) {
693  log_error() << "Not enough memory for run formation; have " << memAvail1
694  << " bytes but " << serialization_writer::memory_usage()
695  << " is required for writing a run." << std::endl;
696  throw exception("Not enough memory for run formation");
697  }
698 
699  memory_size_type memAvail2 = m_params.memoryPhase2;
700 
701  // We have to keep a writer open no matter what.
702  if (memAvail2 <= serialization_writer::memory_usage()) {
703  log_error() << "Not enough memory for merging. "
704  << "mem avail = " << memAvail2
705  << ", writer usage = " << serialization_writer::memory_usage()
706  << std::endl;
707  throw exception("Not enough memory for merging.");
708  }
709 
710  memory_size_type memAvail3 = m_params.memoryPhase3;
711 
712  // We have to keep a writer open no matter what.
713  if (memAvail2 <= serialization_writer::memory_usage()) {
714  log_error() << "Not enough memory for outputting. "
715  << "mem avail = " << memAvail3
716  << ", writer usage = " << serialization_writer::memory_usage()
717  << std::endl;
718  throw exception("Not enough memory for outputting.");
719  }
720 
721  memory_size_type memForMerge = std::min(memAvail2, memAvail3);
722 
723  // We do not yet know the serialized size of the largest item,
724  // so this calculation has to be redone.
725  // Instead, we assume that all items have minimum size.
726 
727  // We have to keep a writer open no matter what.
728  memory_size_type fanoutMemory = memForMerge - serialization_writer::memory_usage();
729 
730  // This is a lower bound on the memory used per fanout.
731  memory_size_type perFanout = m_params.minimumItemSize + serialization_reader::memory_usage();
732 
733  // Floored division to compute the largest possible fanout.
734  memory_size_type fanout = std::min(fanoutMemory / perFanout, m_params.filesPhase2 - 1);
735  if (fanout < 2) {
736  log_error() << "Not enough memory for merging, even when minimum item size is assumed. "
737  << "mem avail = " << memForMerge
738  << ", fanout memory = " << fanoutMemory
739  << ", per fanout >= " << perFanout
740  << std::endl;
741  throw exception("Not enough memory for merging.");
742  }
743 
744  m_params.tempDir = tempname::tpie_dir_name();
745  m_files.set_temp_dir(m_params.tempDir);
746 
747  log_debug() << "Calculated serialization_sorter parameters.\n";
748  m_params.dump(log_debug());
749  log_debug() << std::flush;
750 
751  m_parametersSet = true;
752  }
753 
754 public:
755  void begin() {
756  if (!m_parametersSet)
757  calculate_parameters();
758  if (m_state != state_initial)
759  throw tpie::exception("Bad state in begin");
760  m_state = state_1;
761 
762  log_debug() << "Before begin; mem usage = "
763  << get_memory_manager().used() << std::endl;
764  m_sorter.begin(m_params.memoryPhase1 - serialization_writer::memory_usage());
765  log_debug() << "After internal sorter begin; mem usage = "
766  << get_memory_manager().used() << std::endl;
767  boost::filesystem::create_directory(m_params.tempDir);
768  }
769 
770  void push(const T & item) {
771  if (m_state != state_1)
772  throw tpie::exception("Bad state in push");
773 
774  ++m_items;
775 
776  if (m_sorter.push(item)) return;
777  end_run();
778  if (!m_sorter.push(item)) {
779  throw exception("Couldn't fit a single item in buffer");
780  }
781  }
782 
783  void end() {
784  if (m_state != state_1)
785  throw tpie::exception("Bad state in end");
786 
787  memory_size_type internalThreshold =
788  std::min(m_params.memoryPhase2, m_params.memoryPhase3);
789 
790  log_debug() << "m_sorter.memory_usage == " << m_sorter.memory_usage() << '\n'
791  << "internalThreshold == " << internalThreshold << std::endl;
792 
793  if (m_items == 0) {
794  m_reportInternal = true;
795  m_nextInternalItem = 0;
796  m_sorter.free();
797  log_debug() << "Got no items. Internal reporting mode." << std::endl;
798  } else if (m_files.next_level_runs() == 0
799  && m_sorter.memory_usage()
800  <= internalThreshold) {
801 
802  m_sorter.sort();
803  m_reportInternal = true;
804  m_nextInternalItem = m_sorter.begin();
805  log_debug() << "Got " << m_sorter.current_serialized_size()
806  << " bytes of items. Internal reporting mode." << std::endl;
807  } else if (m_files.next_level_runs() == 0
808  && m_sorter.current_serialized_size() <= internalThreshold
809  && m_sorter.can_shrink_buffer()) {
810 
811  m_sorter.sort();
812  m_sorter.shrink_buffer();
813  m_reportInternal = true;
814  m_nextInternalItem = m_sorter.begin();
815  log_debug() << "Got " << m_sorter.current_serialized_size()
816  << " bytes of items. Internal reporting mode after shrinking buffer." << std::endl;
817 
818  } else {
819 
820  end_run();
821  log_debug() << "Got " << m_files.next_level_runs() << " runs. "
822  << "External reporting mode." << std::endl;
823  m_sorter.free();
824  m_reportInternal = false;
825  }
826 
827  log_debug() << "After internal sorter end; mem usage = "
828  << get_memory_manager().used() << std::endl;
829 
830  m_state = state_2;
831  }
832 
833  stream_size_type item_count() {
834  return m_items;
835  }
836 
837  void evacuate() {
838  switch (m_state) {
839  case state_initial:
840  throw tpie::exception("Cannot evacuate in state initial");
841  case state_1:
842  throw tpie::exception("Cannot evacuate in state 1");
843  case state_2:
844  case state_3:
845  if (m_reportInternal) {
846  end_run();
847  m_sorter.free();
848  m_reportInternal = false;
849  log_debug() << "Evacuate out of internal reporting mode." << std::endl;
850  } else {
851  log_debug() << "Evacuate in external reporting mode - noop." << std::endl;
852  }
853  break;
854  }
855  }
856 
857  memory_size_type evacuated_memory_usage() const {
858  return 0;
859  }
860 
861 
862  bool is_merge_runs_free() {
863  if (m_state != state_2)
864  throw tpie::exception("Bad state in end");
865  if (m_reportInternal) return true;
866 
867  memory_size_type largestItem = m_sorter.get_largest_item_size();
868  memory_size_type fanoutMemory = m_params.memoryPhase2 - serialization_writer::memory_usage();
869  memory_size_type perFanout = largestItem + serialization_reader::memory_usage();
870  memory_size_type fanout = std::min(m_params.filesPhase2 - 1, fanoutMemory / perFanout);
871 
872  memory_size_type finalFanoutMemory = m_params.memoryPhase3;
873  memory_size_type finalFanout = std::min(
874  {m_params.filesPhase3 - 1, fanout, finalFanoutMemory / perFanout});
875 
876  return m_files.next_level_runs() <= finalFanout;
877  }
878 
879  void merge_runs() {
880  if (m_state != state_2)
881  throw tpie::exception("Bad state in end");
882 
883  if (m_reportInternal) {
884  log_debug() << "merge_runs: internal reporting; doing nothing." << std::endl;
885  m_state = state_3;
886  return;
887  }
888 
889  memory_size_type largestItem = m_sorter.get_largest_item_size();
890  if (largestItem == 0) {
891  log_warning() << "Largest item is 0 bytes; doing nothing." << std::endl;
892  m_state = state_3;
893  return;
894  }
895 
896  if (m_params.memoryPhase2 <= serialization_writer::memory_usage())
897  throw exception("Not enough memory for merging.");
898 
899  // Perform almost the same computation as in calculate_parameters.
900  // Only change the item size to largestItem rather than minimumItemSize.
901  memory_size_type fanoutMemory = m_params.memoryPhase2 - serialization_writer::memory_usage();
902  memory_size_type perFanout = largestItem + serialization_reader::memory_usage();
903  memory_size_type fanout = std::min(fanoutMemory / perFanout, m_params.filesPhase2 - 1);
904 
905  if (fanout < 2) {
906  log_error() << "Not enough memory for merging. "
907  << "mem avail = " << m_params.memoryPhase2
908  << ", fanout memory = " << fanoutMemory
909  << ", per fanout = " << perFanout
910  << std::endl;
911  throw exception("Not enough memory for merging.");
912  }
913 
914  memory_size_type finalFanoutMemory = m_params.memoryPhase3;
915  memory_size_type finalFanout = std::min(
916  {m_params.filesPhase3 - 1, fanout, finalFanoutMemory / perFanout});
917 
918  if (finalFanout < 2) {
919  log_error() << "Not enough memory for merging (final fanout < 2). "
920  << "mem avail = " << m_params.memoryPhase3
921  << ", final fanout memory = " << finalFanoutMemory
922  << ", per fanout = " << perFanout
923  << std::endl;
924  throw exception("Not enough memory for merging.");
925  }
926 
927  log_debug() << "Calculated merge phase parameters for serialization sort.\n"
928  << "Fanout: " << fanout << '\n'
929  << "Final fanout: " << finalFanout << '\n'
930  ;
931 
932  while (m_files.next_level_runs() > finalFanout) {
933  if (m_files.remaining_runs() != 0)
934  throw exception("m_files.remaining_runs() != 0");
935  log_debug() << "Runs in current level: " << m_files.next_level_runs() << '\n';
936  for (size_t remainingRuns = m_files.next_level_runs(); remainingRuns > 0;) {
937  size_t f = std::min(fanout, remainingRuns);
938  merge_runs(f);
939  remainingRuns -= f;
940  if (remainingRuns != m_files.remaining_runs())
941  throw exception("remainingRuns != m_files.remaining_runs()");
942  }
943  }
944 
945  m_state = state_3;
946  }
947 
948 private:
949  void end_run() {
950  m_sorter.sort();
951  if (m_sorter.begin() == m_sorter.end()) return;
952  m_files.open_new_writer();
953  for (const T * item = m_sorter.begin(); item != m_sorter.end(); ++item) {
954  m_files.write(*item);
955  }
956  m_files.close_writer();
957  m_sorter.reset();
958  }
959 
960  void initialize_merger(size_t fanout) {
961  if (fanout == 0) throw exception("initialize_merger: fanout == 0");
962  m_files.open_readers(fanout);
963  m_merger.init(fanout);
964  }
965 
966  void free_merger_and_files() {
967  m_merger.free();
968  m_files.close_readers_and_delete();
969  }
970 
971  void merge_runs(size_t fanout) {
972  if (fanout == 0) throw exception("merge_runs: fanout == 0");
973 
974  if (fanout == 1 && m_files.remaining_runs() == 1) {
975  m_files.move_last_reader_to_next_level();
976  return;
977  }
978 
979  initialize_merger(fanout);
980  m_files.open_new_writer();
981  while (!m_merger.empty()) {
982  m_files.write(m_merger.top());
983  m_merger.pop();
984  }
985  free_merger_and_files();
986  m_files.close_writer();
987  }
988 
989 public:
990  T pull() {
991  if (!can_pull())
992  throw exception("pull: !can_pull");
993 
994  if (m_reportInternal) {
995  T item = *m_nextInternalItem++;
996  if (m_nextInternalItem == m_sorter.end()) {
997  m_sorter.free();
998  m_nextInternalItem = 0;
999  }
1000  return item;
1001  }
1002 
1003  if (!m_files.readers_open()) {
1004  if (m_files.next_level_runs() == 0)
1005  throw exception("pull: next_level_runs == 0");
1006  initialize_merger(m_files.next_level_runs());
1007  }
1008 
1009  T item = m_merger.top();
1010  m_merger.pop();
1011 
1012  if (m_merger.empty()) {
1013  free_merger_and_files();
1014  m_files.reset();
1015  }
1016 
1017  return item;
1018  }
1019 
1020  bool can_pull() {
1021  if (m_reportInternal) return m_nextInternalItem != 0;
1022  if (!m_files.readers_open()) return m_files.next_level_runs() > 0;
1023  return !m_merger.empty();
1024  }
1025 };
1026 
1027 }
1028 
1029 #endif // TPIE_SERIALIZATION_SORTER_H
memory_size_type memoryPhase2
Memory available while merging runs.
size_t serialized_size(const T &v)
Given a serializable, serialize it and measure its serialized size.
Encapsulation of two pointers from any random access container.
T * get()
Return a raw pointer to the array content.
Definition: array.h:531
memory_size_type filesPhase3
files available during output phase.
Binary serialization and unserialization.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
memory_size_type memoryPhase1
memory available while forming sorted runs.
void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3)
Calculate parameters from given amount of files.
void reset()
Reset sorter, but keep the remembered largest item size and buffer size.
Stream of serializable items.
memory_size_type memoryPhase3
Memory available during output phase.
static std::string tpie_dir_name(const std::string &post_base="", const std::string &dir="")
Generate path for a new temporary directory.
Logging functionality and log_level codes for different priorities of log messages.
Base class of all nodes.
Definition: node.h:78
I/O statistics.
Generic internal array with known memory requirements.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
void free()
Deallocate buffer and call reset().
Simple parallel quick sort implementation with progress tracking.
memory_size_type filesPhase2
files available while merging runs.
Class storring a reference to a memory bucket.
Definition: memory.h:366
stream_size_type file_size()
Size of file in bytes, including the header.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
Bucket used for memory counting.
Definition: memory.h:352
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
size_t used() const noexcept
Return the current amount of the resource used.
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
memory_size_type filesPhase1
files available while forming sorted runs.
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
Definition: node.h:779
std::string tempDir
Directory in which temporary files are stored.
void swap(array &other)
Swap two arrays.
Definition: array.h:499
void increment_temp_file_usage(stream_offset_type delta)
Increment (possibly by a negative amount) the number of bytes being used by temporary files...
size_type size() const
Return the size of the array.
Definition: array.h:526
Temporary file names.
memory_size_type minimumItemSize
Minimum size of serialized items.
bool push(const T &item)
True if all items up to and including this one fits in buffer.
memory_size_type current_serialized_size()
Get the serialized size of the items written.
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:147
void set_available_files(memory_size_type f)
Calculate parameters from given amount of files.
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:157
memory_size_type memory_usage()
Compute current memory usage.