TPIE

2362a60
sort_manager.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2008, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
27 
28 #ifndef _TPIE_AMI_SORT_MANAGER_H
29 #define _TPIE_AMI_SORT_MANAGER_H
30 
31 // Get definitions for working with Unix and Windows
32 #include <tpie/portability.h>
33 #include <tpie/stream.h>
34 #include <tpie/tempname.h>
35 #include <tpie/array.h>
36 #include <tpie/merge_sorted_runs.h>
37 #include <tpie/mergeheap.h> //For templated heaps
38 #include <tpie/internal_sort.h> // Contains classes for sorting internal runs
39 // using different comparison types
40 #include <cmath> //for log, ceil, etc.
41 #include <string>
42 #include <boost/filesystem.hpp>
43 
45 
46 #include <tpie/tpie_assert.h>
47 
48 namespace tpie {
49 
51 typedef TPIE_OS_SIZE_T arity_t;
52 
58 
59 template <class T, class I, class M>
60 class sort_manager {
61 
62 public:
63  sort_manager(I* isort, M* mheap);
64 
65  ~sort_manager() {
66  // No code in this destructor.
67  };
68 
72  void sort(file_stream<T>* in, file_stream<T>* out,
73  progress_indicator_base* indicator = NULL);
74 
79  void sort(file_stream<T>* in, progress_indicator_base* indicator = NULL);
80 
81 private:
82  // *************
83  // * Functions *
84  // *************
85 
86  void start_sort(); // high level wrapper to full sort
87  void compute_sort_params(); // compute nInputItems, mrgArity, nRuns
88  void partition_and_sort_runs(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries); // make initial sorted runs
89  void merge_to_output(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries); // loop over merge tree, create output stream
90  // Merge a single group mrgArity streams to an output stream
91  void single_merge(
92  typename tpie::array<tpie::unique_ptr<file_stream<T> > >::iterator,
93  typename tpie::array<tpie::unique_ptr<file_stream<T> > >::iterator,
94  file_stream<T>*, TPIE_OS_OFFSET = -1, progress_indicator_base* indicator=0);
95 
96  // **************
97  // * Attributes *
98  // **************
99 
100  I* m_internalSorter; // Method for sorting runs in memory
101  M* m_mergeHeap; // Merge heap implementation
102  file_stream<T>* inStream;
103  file_stream<T>* outStream;
104  TPIE_OS_OFFSET nInputItems; // Number of items in inStream;
105  TPIE_OS_SIZE_T mmBytesAvail; // Amount of spare memory we can use
106  TPIE_OS_SIZE_T mmBytesPerStream; // Memory consumed by each Stream obj
107 
108  progress_indicator_base* m_indicator; // pointer to progress indicator
109 
110  TPIE_OS_OFFSET progCount; //counter for showing progress
111 
112  bool use2xSpace; //flag to indicate if we are doing a 2x sort
113 
114  // The maximum number of stream items of type T that we can
115  // sort in internal memory
116  TPIE_OS_SIZE_T nItemsPerRun;
117 
118  TPIE_OS_OFFSET nRuns; //The number of sorted runs left to merge
119  arity_t mrgArity; //Max runs we can merge at one time
120 
121  // The output stream to which we are currently writing runs
122  file_stream<T>* curOutputRunStream;
123 
124  // The mininum number of runs in each output stream
125  // some streams can have one additional run
126  TPIE_OS_OFFSET minRunsPerStream;
127  // The number of extra runs or the number of streams that
128  // get one additional run.
129  arity_t nXtraRuns;
130 
131  // The last run can have fewer than nItemsPerRun;
132  TPIE_OS_SIZE_T nItemsInLastRun;
133  // How many items we will sort in a given run
134  TPIE_OS_SIZE_T nItemsInThisRun;
135  // For each output stream, how many runs it should get
136  TPIE_OS_OFFSET runsInStream;
137 
138  // A buffer for building the output file names
139  std::string newName;
140 
141  //prefix of temp files created during sort
142  std::string working_disk;
143 
144 private:
145  sort_manager(const sort_manager<T,I,M>& other);
146  sort_manager<T,I,M>& operator=(const sort_manager<T,I,M>& other);
147 };
148 
149 template <class T, class I, class M>
150 sort_manager<T, I, M>::sort_manager(I* isort, M* mheap):
151  m_internalSorter(isort),
152  m_mergeHeap(mheap),
153  inStream(0),
154  outStream(0),
155  nInputItems(0),
156  mmBytesAvail(0),
157  mmBytesPerStream(0),
158  m_indicator(NULL),
159  progCount(0),
160  use2xSpace(false),
161  nItemsPerRun(0),
162  nRuns(0),
163  mrgArity(0),
164  curOutputRunStream(NULL),
165  minRunsPerStream(0),
166  nXtraRuns(0),
167  nItemsInLastRun(0),
168  nItemsInThisRun(0),
169  runsInStream(0) {
170 
171  // Prefix of temp files created during sort
172  working_disk = std::string(tempname::tpie_name("sort"));
173 };
174 
175 template<class T, class I, class M>
177  progress_indicator_base* indicator){
178  m_indicator = indicator;
179 
180  // if the input and output stream are the same, we only use 2x space.
181  // otherwise, we need 3x space. (input, current temp runs, output runs)
182  use2xSpace = (in == out);
183 
184  inStream = in;
185  outStream = out;
186 
187  // Basic checks that input is ok
188  if (in==NULL || out==NULL) {
189  if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
190  throw exception("NULL_POINTER");
191  }
192 
193  if (inStream->size() < 2) {
194  if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
195  in->seek(0);
196  if (in != out) {
197  out->seek(0);
198  if (in->size() == 1)
199  out->write(in->read());
200  }
201  return;
202  }
203 
204  // Else, there is something to sort, do it
205  start_sort();
206 }
207 
208 template<class T, class I, class M>
210  sort(in, in, indicator);
211 }
212 
213 template<class T, class I, class M>
215 
216  // ********************************************************************
217  // * PHASE 1: See if we can sort the entire stream in internal memory *
218  // * without the need to use general merge sort *
219  // ********************************************************************
220 
221  // Figure out how much memory we've got to work with.
222  mmBytesAvail = consecutive_memory_available();
223 
224  // Space for internal buffers for the input and output stream may not
225  // have been allocated yet. Query the space usage and subtract.
226  mmBytesPerStream = file_stream<T>::memory_usage(1);
227 
228  // This is how much we can use for internal sort if
229  // we are not doing general merge sort
230  mmBytesAvail -= 2 * mmBytesPerStream;
231 
232  // Check if all input items can be sorted internally using less than
233  // mmBytesAvail
234  nInputItems = inStream->size();
235 
236  inStream->seek (0);
237 
238  if (nInputItems < TPIE_OS_OFFSET(m_internalSorter->MaxItemCount(mmBytesAvail))) {
239 
240  fractional_progress fp(m_indicator);
241  fp.id() << __FILE__ << __FUNCTION__ << "internal_sort" << typeid(T) << typeid(I) << typeid(M);
242  fractional_subindicator allocate_progress(fp, "allocate", TPIE_FSI, nInputItems, "Allocating");
243  fractional_subindicator sort_progress(fp, "sort", TPIE_FSI, nInputItems);
244  fp.init();
245  allocate_progress.init(nInputItems);
246  m_internalSorter->allocate(static_cast<TPIE_OS_SIZE_T>(nInputItems));
247  allocate_progress.done();
248 
249  // load the items into main memory, sort, and write to output.
250  // m_internalSorter also checks if inStream/outStream are the same and
251  // truncates/rewrites inStream if they are. This probably should not
252  // be the job of m_internalSorter-> TODO: build a cleaner interface
253  m_internalSorter->sort(inStream,
254  outStream,
255  static_cast<TPIE_OS_SIZE_T>(nInputItems),
256  &sort_progress);
257  // de-allocate the internal array of items
258  m_internalSorter->deallocate();
259  fp.done();
260  return;
261  }
262 
263  // ******************************************************************
264  // * Input stream too large for main memory, use general merge sort *
265  // ******************************************************************
266 
267  // PHASE 2: compute nItemsPerRun, nItemsPerRun, nRuns
268  compute_sort_params();
269 
270  // ********************************************************************
271  // * By this point we have checked that we have valid input, checked *
272  // * that we indeed need an external memory sort, verified that we *
273  // * have enough memory to partition and at least do a binary merge. *
274  // * Also checked that we have enough file descriptors to merge, *
275  // * and calculated the mrgArity and nItemsPerRun given memory *
276  // * constraints. We have also calculated nRuns for the initial *
277  // * number of runs we will partition into. Let's sort! *
278  // ********************************************************************
279 
280  // ********************************************************************
281  // * WARNING: Since we accounted for all known memory usage in PHASE 2*
282  // * be very wary of memory allocation via "new" or constructors from *
283  // * this point on and make sure it was accounted for in PHASE 2 *
284  // ********************************************************************
285  fractional_progress fp(m_indicator);
286  fp.id() << __FILE__ << __FUNCTION__ << "external_sort" << typeid(T) << typeid(I) << typeid(M);
287  fractional_subindicator run_progress(fp, "run", TPIE_FSI, nInputItems,"",tpie::IMPORTANCE_LOG);
288  fractional_subindicator merge_progress(fp, "merge", TPIE_FSI, nInputItems,"",tpie::IMPORTANCE_LOG);
289  fp.init();
290 
291  tpie::array<temp_file> temporaries(mrgArity*2);
292 
293  // PHASE 3: partition and form sorted runs
294  TP_LOG_DEBUG_ID ("Beginning general merge sort.");
295  partition_and_sort_runs(&run_progress, temporaries);
296  // PHASE 4: merge sorted runs to a single output stream
297  merge_to_output(&merge_progress, temporaries);
298 
299  fp.done();
300 }
301 
302 template<class T, class I, class M>
303 void sort_manager<T,I,M>::compute_sort_params(void){
304  // ********************************************************************
305  // * PHASE 2: Compute/check limits *
306  // * Compute the maximum number of items we can sort in main memory *
307  // * and the maximium number of sorted runs we can merge at one time *
308  // * Before doing any sorting, check that we can fit at least one item*
309  // * in internal memory for sorting and that we can merge at least two*
310  // * runs at at time *
311  // * *
312  // * Memory needed for the run formation phase: *
313  // * 2*mmBytesPerStream + {for input/output streams} *
314  // * nItemsPerRun*space_per_sort_item() + {for each item sorted } *
315  // * space_overhead_sort() {constant overhead in *
316  // * sort management object *
317  // * during sorting } *
318  // * *
319  // * Memory needed for a D-way merge: *
320  // * Cost per merge stream: *
321  // * mmBytesPerStream+ {a open stream to read from} *
322  // * space_per_merge_item()+ {used in internal merge heap} *
323  // * sizeof(T*)+sizeof(off_t) {arrays in single_merge()} *
324  // * sizeof(stream<T>*) {array element that points to *
325  // * merge stream} *
326  // * Fixed costs: *
327  // * 2*mmBytesPerStream+ {original input stream + output *
328  // * of current merge} *
329  // * space_overhead_merge()+ {fixed dynamic memory costs of *
330  // * merge heap} *
331  // * 3*space_overhead() {overhead per "new" memory request *
332  // * for allocating array of streams *
333  // * in merge_to_output and two arrays *
334  // * in single_merge} *
335  // * *
336  // * Total cost for D-way Merge: *
337  // * D*(Cost per merge stream)+(Fixed costs) *
338  // * *
339  // * Any additional memory requests that call "new" directly or *
340  // * indirectly should be documented and accounted for in this phase *
341  // ********************************************************************
342 
343  TP_LOG_DEBUG_ID ("Computing merge sort parameters.");
344 
345  TPIE_OS_SIZE_T mmBytesAvailSort; // Bytes available for sorting
346 
347  TP_LOG_DEBUG ("Each object of size " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(sizeof(T)) << " uses "
348  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(m_internalSorter->space_per_item ()) << " bytes "
349  << "for sorting in memory\n");
350 
351  // Subtract off size of temp output stream
352  // The size of the input stream was already subtracted from
353  // mmBytesAvail
354  mmBytesAvailSort=mmBytesAvail - mmBytesPerStream;
355 
356  nItemsPerRun=m_internalSorter->MaxItemCount(mmBytesAvailSort);
357 
358  if(nItemsPerRun<1){
359  throw stream_exception("Insufficient Memory for forming sorted runs");
360  }
361 
362  // Now we know the max number of Items we can sort in a single
363  // internal memory run. Next, compute the number of runs we can
364  // merge together at one time
365 
366  TPIE_OS_SIZE_T mmBytesPerMergeItem = mmBytesPerStream +
367  m_mergeHeap->space_per_item() + sizeof(T*) +
368  sizeof(TPIE_OS_OFFSET)+sizeof(ami::stream<T>*);
369 
370  // Fixed cost of mergheap impl. + MM_manager overhead of allocating
371  // an array of stream<T> ptrs (pending)
372  // cost of Input stream already accounted for in mmBytesAvail..
373  TPIE_OS_SIZE_T mmBytesFixedForMerge = m_mergeHeap->space_overhead() +
374  mmBytesPerStream;
375 
376  if (mmBytesFixedForMerge > mmBytesAvail) {
377  throw stream_exception("Insufficient memory for merge heap and output stream");
378  }
379 
380  // Cast down from TPIE_OS_OFFSET (type of mmBytesAvail).
381  // mmBytesPerMergeItem is at least 1KB, so we are OK unless we
382  // have more than 2 TerraBytes of memory, assuming 64 bit
383  // (or smaller) TPIE_OS_OFFSETS. I look forward to the day
384  // this comment seems silly and wrong
385  mrgArity = static_cast<arity_t>(mmBytesAvail-mmBytesFixedForMerge) /
386  mmBytesPerMergeItem;
387  TP_LOG_DEBUG("mem avail=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesAvail-mmBytesFixedForMerge)
388  << " bytes per merge item=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesPerMergeItem)
389  << " initial mrgArity=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) << "\n");
390 
391  // Need to support at least binary merge
392  if(mrgArity < 2) {
393  throw stream_exception("Merge arity < 2 -- Insufficient memory for a merge.");
394  }
395 
396  // Make sure that the AMI is willing to provide us with the
397  // number of substreams we want. It may not be able to due to
398  // operating system restrictions, such as on the number of regions
399  // that can be mmap()ed in, max number of file descriptors, etc.
400  int availableStreams = static_cast<int>(get_file_manager().available());
401 
402  // Merging requires an available stream/file decriptor for
403  // each of the mrgArity input strems. We need one additional file descriptor
404  // for the output of the current merge, so binary merge requires
405  // three available streams.
406  if (availableStreams < 3) {
407  throw stream_exception("Not enough stream descriptors available to perform merge.");
408  }
409 
410  // Can at least do binary merge. See if availableStreams limits
411  // maximum mrgArity
412  // Due to the previous test, we know that available_streams >= 3.
413  if (mrgArity > static_cast<arity_t>(availableStreams - 1)) {
414 
415  mrgArity = static_cast<arity_t>(availableStreams - 1);
416 
417  TP_LOG_DEBUG_ID ("Reduced merge arity due to AMI restrictions.");
418  }
419 
420  // The number of memory-sized runs that the original input stream
421  // will be partitioned into.
422  nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
423 
424 #ifdef TPIE_SORT_SMALL_MRGARITY
425  // KEEP OUT!!!
426  // This should not be done by the typical user and is only for
427  // testing/debugging purposes. ONLY define this flag and set a value
428  // if you know what you are doing.
429  TP_LOG_WARNING_ID("TPIE_SORT_SMALL_MRGARITY flag is set."
430  " Did you mean to do this?");
431  if(mrgArity > TPIE_SORT_SMALL_MRGARITY) {
432  TP_LOG_WARNING_ID("Reducing merge arity due to compiler specified flag");
433  mrgArity=TPIE_SORT_SMALL_MRGARITY;
434  }
435 #endif // TPIE_SORT_SMALL_MRGARITY
436 
437 #ifdef TPIE_SORT_SMALL_RUNSIZE
438  // KEEP OUT!!!
439  // This should not be done by the typical user and is only for
440  // testing/debugging purposes ONLY define this flag and set a value
441  // if you know what you are doing.
442  TP_LOG_WARNING_ID("TPIE_SORT_SMALL_RUNSIZE flag is set."
443  " Did you mean to do this?");
444  if(nItemsPerRun > TPIE_SORT_SMALL_RUNSIZE) {
445  TP_LOG_WARNING_ID("Reducing run size due to compiler specified flag");
446  nItemsPerRun=TPIE_SORT_SMALL_RUNSIZE;
447  }
448 
449  // need to adjust nRuns
450  nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
451 #endif // TPIE_SORT_SMALL_RUNSIZE
452 
453  //#define MINIMIZE_INITIAL_RUN_LENGTH
454 #ifdef MINIMIZE_INITIAL_RUN_LENGTH
455  // If compiled with the above flag, try to reduce the length of
456  // the initial sorted runs without increasing the merge tree height
457  // This could be a speed-up if it is faster to quicksort many small
458  // runs and merge them than it is to quicksort fewer long
459  // runs and merge them.
460  TP_LOG_DEBUG_ID ("Minimizing initial run lengths without increasing" <<
461  " the height of the merge tree.");
462 
463  // The tree height is the ceiling of the log base mrgArity of the
464  // number of original runs.
465  double tree_height = log((double)nRuns) / log((double)mrgArity);
466  tp_assert (tree_height > 0, "Negative or zero tree height!");
467  tree_height = ceil (tree_height);
468 
469  // See how many runs we could possibly fit in the tree without
470  // increasing the height.
471  double maxOrigRuns = pow ((double) mrgArity, tree_height);
472  tp_assert (maxOrigRuns >= nRuns "Number of permitted runs was reduced!");
473 
474  // How big will such runs be?
475  double new_nItemsPerRun = ceil (nInputItems/ maxOrigRuns);
476  tp_assert (new_nItemsPerRun <= nItemsPerRun,
477  "Size of original runs increased!");
478 
479  // Update the number of items per run and the number of original runs
480  nItemsPerRun = (TPIE_OS_SIZE_T) new_nItemsPerRun;
481 
482  TP_LOG_DEBUG_ID ("With long internal memory runs, nRuns = "
483  << nRuns);
484 
485  nRuns = (nInputItems + nItemsPerRun - 1) / nItemsPerRun;
486 
487  TP_LOG_DEBUG_ID ("With shorter internal memory runs "
488  << "and the same merge tree height, nRuns = "
489  << nRuns );
490 
491  tp_assert (maxOrigRuns >= nRuns,
492  "We increased the merge height when we weren't supposed to do so!");
493 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
494 
495 
496  // If we have just a few runs, we don't need the
497  // full mrgArity. This is the last change to mrgArity
498  // N.B. We need to "up"-cast mrgArity here!
499  if(static_cast<TPIE_OS_OFFSET>(mrgArity)>nRuns){
500  // N.B. We know that nRuns is small, so
501  // it is safr to downcast.
502  mrgArity=static_cast<TPIE_OS_SIZE_T>(nRuns);
503  }
504 
505  // We should always end up with at least two runs
506  // otherwise why are we doing it externally?
507  tp_assert (nRuns > 1, "Less than two runs to merge!");
508  // Check that numbers are consistent with input size
509  tp_assert (nRuns * nItemsPerRun - nInputItems < nItemsPerRun,
510  "Total expected output size is too large.");
511  tp_assert (nInputItems - (nRuns - 1) * nItemsPerRun <= nItemsPerRun,
512  "Total expected output size is too small.");
513 
514  TP_LOG_DEBUG_ID ("Input stream has " << nInputItems << " items");
515  TP_LOG_DEBUG ("Max number of items per runs " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nItemsPerRun) );
516  TP_LOG_DEBUG ("\nInitial number of runs " << nRuns );
517  TP_LOG_DEBUG ("\nMerge arity is " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) << "\n" );
518 }
519 
520 template<class T, class I, class M>
521 void sort_manager<T,I,M>::partition_and_sort_runs(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries){
522  // ********************************************************************
523  // * PHASE 3: Partition *
524  // * Partition the input stream into nRuns of at most nItemsPerRun *
525  // * and sort them, and write them to temporay output files. *
526  // * The last run may have fewer than nItemsPerRun. To keep the number*
527  // * of files down and to support sequential I/O, we distribute the *
528  // * nRuns evenly across mrgArity files, thus each file on disk holds *
529  // * multiple sorted runs. *
530  // ********************************************************************
531 
532  // The mininum number of runs in each output stream
533  // some streams can have one additional run
534  minRunsPerStream = nRuns/mrgArity;
535  // The number of extra runs or the number of streams that
536  // get one additional run. This is less than mrgArity and
537  // it is OK to downcast to an arity_t.
538  nXtraRuns = static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
539  tp_assert(nXtraRuns<mrgArity, "Too many extra runs");
540 
541  // The last run can have fewer than nItemsPerRun;
542  // general case
543  nItemsInLastRun = static_cast<TPIE_OS_SIZE_T>(nInputItems % nItemsPerRun);
544  if(nItemsInLastRun==0){
545  // Input size is an exact multiple of nItemsPerStream
546  nItemsInLastRun=nItemsPerRun;
547  }
548 
549  // Initialize memory for the internal memory runs
550  // accounted for in phase 2: (nItemsPerRun*size_of_sort_item) +
551  // space_overhead_sort
552  m_internalSorter->allocate(nItemsPerRun);
553 
554  TP_LOG_DEBUG_ID ("Partitioning and forming sorted runs.");
555 
556  // nItemsPerRun except for last run.
557  nItemsInThisRun=nItemsPerRun;
558 
559  // Rewind the input stream, we are about to begin
560  inStream->seek(0);
561 
562  // ********************************************************************
563  // * Partition and make initial sorted runs *
564  // ********************************************************************
565  TPIE_OS_OFFSET check_size = 0; //for debugging
566 
567  if (indicator)
568  indicator->init(nRuns*1000);
569 
570  for(arity_t ii=0; ii<mrgArity; ii++){ //For each output stream
571  // Dynamically allocate the stream
572  // We account for these mmBytesPerStream in phase 2 (output stream)
573  curOutputRunStream = tpie_new<file_stream<T> >();
574  curOutputRunStream->open(temporaries[ii], access_write);
575 
576  // How many runs should this stream get?
577  // extra runs go in the LAST nXtraRuns streams so that
578  // the one short run is always in the LAST output stream
579  runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
580 
581  for(TPIE_OS_OFFSET jj=0; jj < runsInStream; jj++ ) { // For each run in this stream
582  // See if this is the last run
583  if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
584  nItemsInThisRun=nItemsInLastRun;
585  }
586 
587  progress_indicator_subindicator sort_indicator(indicator, 1000);
588  m_internalSorter->sort(inStream, curOutputRunStream,
589  nItemsInThisRun, &sort_indicator);
590  } // For each run in this stream
591 
592  // All runs created for this stream, clean up
593  TP_LOG_DEBUG_ID ("Wrote " << runsInStream << " runs and "
594  << curOutputRunStream->size() << " items to file "
595  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii));
596  check_size+=curOutputRunStream->size();
597  tpie_delete(curOutputRunStream);
598 
599  }//For each output stream
600  tp_assert(check_size == nInputItems, "item count mismatch");
601 
602  // Done with partitioning and initial run formation
603  // free space associated with internal memory sorting
604  m_internalSorter->deallocate();
605  if(use2xSpace){
606  //recall outStream/inStream point to same file in this case
607  inStream->truncate(0); //free up disk space
608  inStream->seek(0);
609  }
610  if (indicator) indicator->done();
611 }
612 
613 template<class T, class I, class M>
614 void sort_manager<T,I,M>::merge_to_output(progress_indicator_base* indicator, tpie::array<temp_file> & temporaries){
615  // ********************************************************************
616  // * PHASE 4: Merge *
617  // * Loop over all levels of the merge tree, reading mrgArity runs *
618  // * at a time from the streams at the current level and distributing *
619  // * merged runs over mrgArity output streams one level up, until *
620  // * a single output stream exists *
621  // ********************************************************************
622 
623  // The input streams we from which will read sorted runs
624  // This Memory allocation accounted for in phase 2:
625  // mrgArity*sizeof(stream<T>*) + space_overhead()[fixed cost]
626  tpie::array<tpie::unique_ptr<file_stream<T> > > mergeInputStreams(mrgArity);
627 
628  TP_LOG_DEBUG_ID("Allocated " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(sizeof(ami::stream<T>*)*mrgArity)
629  << " bytes for " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) << " merge input stream pointers.\n"
630  << "Mem. avail. is " << consecutive_memory_available ());
631 
632  // the number of iterations the main loop has gone through,
633  // at most the height of the merge tree log_{M/B}(N/B),
634  // typically 1 or 2
635  int mrgHeight = 0;
636  int treeHeight = 0; //for progress
637  TPIE_OS_SIZE_T ii; //index vars
638  TPIE_OS_OFFSET jj; //index vars
639 
640  // This Memory allocation accounted for in phase 2:
641  // mrgArity*space_per_merge_item
642  m_mergeHeap->allocate( mrgArity ); //Allocate mem for mergeheap
643 
644  // *****************************************************************
645  // * *
646  // * The main loop. At the outermost level we are looping over *
647  // * levels of the merge tree. Typically this will be very small, *
648  // * e.g. 1-3. The final merge pass is handled outside the loop. *
649  // * Future extension may want to do something special in the last *
650  // * merge *
651  // * *
652  // *****************************************************************
653 
654  if (indicator) {
655  //compute merge depth, number of passes over data
656  treeHeight= static_cast<int>(ceil(log(static_cast<float>(nRuns)) /
657  log(static_cast<float>(mrgArity))));
658 
659  indicator->set_range( nInputItems * treeHeight);
660  indicator->init();
661  }
662 
663  //nRuns is initially the number of runs we formed in partition_and_sort
664  //phase. nXtraRuns is initially the number of outputs streams that
665  //contain one extra run. Runs and nXtraRuns are updated as we
666  //complete a merge level.
667  while (nRuns > TPIE_OS_OFFSET(mrgArity)) {
668  // if (m_indicator) {
669  // std::string description;
670  // std::stringstream buf;
671  // buf << "Merge pass " << mrgHeight+1 << " of " << treeHeight << " ";
672  // buf >> description;
673  // m_indicator->set_percentage_range(0, nInputItems);
674  // m_indicator->init(description);
675  // }
676 
677  // We are not yet at the top of the merge tree
678  // Write merged runs to temporary output streams
679  TP_LOG_DEBUG ("Intermediate merge. level="<<mrgHeight << "\n");
680 
681  // The number of output runs we will form after a mrgArity merge
682  nRuns = (nRuns + mrgArity - 1)/mrgArity;
683 
684  // Distribute the new nRuns evenly across mrgArity (or fewer)
685  // output streams
686  minRunsPerStream = nRuns/mrgArity;
687 
688  // We may have less mrgArity input runs for the last
689  // merged output run if the current set of merge streams has
690  // xtra runs
691  arity_t mergeRunsInLastOutputRun=(nXtraRuns>0) ? nXtraRuns : mrgArity;
692 
693  // The number of extra runs or the number of output streams that
694  // get one additional run. This is less than mrgArity and
695  // it is OK to downcast to an arity_t.
696  nXtraRuns = static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
697  tp_assert(nXtraRuns<mrgArity, "Too many extra runs");
698 
699  // How many Streams we will create at the next level
700  arity_t nOutputStreams = (minRunsPerStream > 0) ? mrgArity : nXtraRuns;
701 
702  arity_t nRunsToMerge = mrgArity; // may change for last output run
703 
704  // open the mrgArity Input streams from which to read runs
705  for(ii = 0; ii < mrgArity; ii++){
706  // Dynamically allocate the stream
707  // We account for these mmBytesPerStream in phase 2
708  // (input stream to read from)
709  file_stream<T> * stream = tpie_new<file_stream<T> >();
710  mergeInputStreams[ii].reset(stream);
711  stream->open(temporaries[mrgArity*(mrgHeight%2)+ii], access_read);
712  stream->seek(0);
713  }
714 
715  TPIE_OS_OFFSET check_size=0;
716  // For each new output stream, fill with merged runs.
717  // strange indexing is for the case that there are fewer than mrgArity
718  // output streams needed, and we use the LAST nOutputStreams. This
719  // always keeps the one possible short run in the LAST of the
720  // mrgArity output streams.
721  TP_LOG_DEBUG("Writing " << nRuns << " runs to " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nOutputStreams)
722  << " output files.\nEach output file has at least "
723  << minRunsPerStream << " runs.\n");
724 
725  for(ii = mrgArity-nOutputStreams; ii < mrgArity; ii++){
726  // Dynamically allocate the stream
727  // We account for these mmBytesPerStream in phase 2
728  // (temp merge output stream)
729  file_stream<T> curOutputRunStream;
730  curOutputRunStream.open(temporaries[mrgArity*((mrgHeight+1)%2)+ii], access_write);
731 
732  // How many runs should this stream get?
733  // extra runs go in the LAST nXtraRuns streams so that
734  // the one short run is always in the LAST output stream
735  runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
736  TP_LOG_DEBUG("Writing " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) << " runs to output "
737  << " file " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) << "\n");
738  for( jj=0; jj < runsInStream; jj++ ) { // For each run in this stream
739  // See if this is the last run.
740  if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
741  nRunsToMerge=mergeRunsInLastOutputRun;
742  }
743  // Merge runs to curOutputRunStream
744  single_merge(mergeInputStreams.find(mrgArity-nRunsToMerge),
745  mergeInputStreams.find(mrgArity),
746  &curOutputRunStream,
747  nItemsPerRun, indicator);
748  } // For each output run in this stream
749 
750  // Commit new output stream to disk
751  TP_LOG_DEBUG("Wrote " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) << " runs and "
752  << curOutputRunStream.size() << " items "
753  << "to file " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) << "\n");
754  check_size+=curOutputRunStream.size();
755  } // For each new output stream
756 
757  tp_assert(check_size==nInputItems, "item count mismatch in merge");
758  // All output streams created/filled.
759  // Clean up, go up to next level
760 
761  // Delete temp input merge streams
762  for(ii = 0; ii < mrgArity; ii++) {
763  mergeInputStreams[ii].reset();
764  temporaries[mrgArity*(mrgHeight%2)+ii].free();
765  }
766  // Update run lengths
767  nItemsPerRun=mrgArity*nItemsPerRun; //except for maybe last run
768  mrgHeight++; // moving up a level
769  } // while (nRuns > mrgArity)
770 
771  tp_assert( nRuns > 1, "Not enough runs to merge to final output");
772  tp_assert( nRuns <= TPIE_OS_OFFSET(mrgArity), "Too many runs to merge to final output");
773 
774  // We are at the last merge phase, write to specified output stream
775  // Open up the nRuns final merge streams to merge
776  // These runs are packed in the LAST nRuns elements of the array
777  // nRuns is small, so it is safe to downcast.
778  TP_LOG_DEBUG_ID ("Final merge. level="<<mrgHeight);
779  TP_LOG_DEBUG("Merge runs left="<<nRuns<<"\n");
780  for(ii = mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns); ii < mrgArity; ii++){
781  /* Dynamically allocate the stream
782  We account for these mmBytesPerStream in phase 2
783  (input stream to read from)
784  Put LAST nRuns files in FIRST nRuns spots here
785  either one of mergeInputStreams loading or the call to
786  single_merge is a little messy. I put the mess here. (abd) */
787  TP_LOG_DEBUG ("Putting merge stream "<< static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) << " in slot "
788  << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii-(mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns))) << "\n");
789  file_stream<T> * stream = tpie_new<file_stream<T> >();
790  mergeInputStreams[ii-(mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns))].reset(stream);
791  stream->open(temporaries[mrgArity*(mrgHeight%2)+ii], access_read);
792  stream->seek(0);
793  }
794 
795  // Merge last remaining runs to the output stream.
796  // mergeInputStreams is address( address (the first input stream) )
797  // N.B. nRuns is small, so it is safe to downcast.
798  single_merge(mergeInputStreams.begin(),
799  mergeInputStreams.find((size_t)nRuns),
800  outStream, -1, indicator);
801 
802  if (indicator) indicator->done();
803  tp_assert((TPIE_OS_OFFSET)outStream->size() == nInputItems, "item count mismatch");
804 
805  TP_LOG_DEBUG("merge cleanup\n");
806 
807  // Delete stream ptr arrays
808  mergeInputStreams.resize(0);
809 
810  // Deallocate the merge heap, free up memory
811  m_mergeHeap->deallocate();
812  TP_LOG_DEBUG_ID ("Number of passes incl run formation is " <<
813  mrgHeight+2 );
814  TP_LOG_DEBUG("AMI_partition_and_merge END\n");
815 }
816 
817 template<class T, class I, class M>
818 void sort_manager<T,I,M>::single_merge(
819  typename tpie::array<tpie::unique_ptr<file_stream<T> > >::iterator start,
820  typename tpie::array<tpie::unique_ptr<file_stream<T> > >::iterator end,
821  file_stream < T >*outStream, TPIE_OS_OFFSET cutoff, progress_indicator_base* indicator)
822 {
823 
824  merge_sorted_runs(start, end, outStream, m_mergeHeap,
825  cutoff, indicator);
826 }
827 
828 
829 } // tpie namespace
830 
831 #endif // _TPIE_AMI_SORT_MANAGER_H
void sort(uncompressed_stream< T > &instream, uncompressed_stream< T > &outstream, Compare comp, progress_indicator_base &indicator)
Sort elements of a stream using the given STL-style comparator object.
Definition: sort.h:141
Defines the tp_assert macro.
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
The base class for indicating the progress of some task.
static std::string tpie_name(const std::string &post_base="", const std::string &dir="", const std::string &ext="")
Generate path for a new temporary file.
A generic array with a fixed size.
Definition: array.h:144
merge_sorted_runs as used in several of TPIE's merge variants
#define TPIE_FSI
For use when constructing a fractional subindicator.
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:947
Fractional progress reporter.
This file contains a few deprecated definitions for legacy code.
Open a file for reading.
Definition: access_type.h:31
void merge_sorted_runs(typename tpie::array< tpie::unique_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::unique_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
Definition: stream.h:627
void sort(file_stream< T > *in, file_stream< T > *out, progress_indicator_base *indicator=NULL)
Sort in stream to out stream an save in stream (uses 3x space)
Definition: sort_manager.h:176
Internal sorter objects.
Generic internal array with known memory requirements.
Merge heap templates.
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
Definition: sort_manager.h:51
Progress indicator base.
file_manager & get_file_manager()
Return a reference to the file manager.
Open a file for writing only, content is truncated.
Definition: access_type.h:33
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
std::unique_ptr< T, tpie_deleter > unique_ptr
like std::unique_ptr, but delete the object with tpie_delete.
Definition: memory.h:338
Compressed stream.
Definition: predeclare.h:46
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Definition: memory.h:301
Temporary file names.
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
A class of manager objects for merge sorting objects of type T.
Definition: sort_manager.h:60
Subindicator for fractional progress reporting.