28 #ifndef _TPIE_AMI_SORT_MANAGER_H
29 #define _TPIE_AMI_SORT_MANAGER_H
43 #include <boost/filesystem.hpp>
57 template <
class T,
class I,
class M>
85 void compute_sort_params();
102 TPIE_OS_OFFSET nInputItems;
103 TPIE_OS_SIZE_T mmBytesAvail;
104 TPIE_OS_SIZE_T mmBytesPerStream;
108 TPIE_OS_OFFSET progCount;
114 TPIE_OS_SIZE_T nItemsPerRun;
116 TPIE_OS_OFFSET nRuns;
124 TPIE_OS_OFFSET minRunsPerStream;
130 TPIE_OS_SIZE_T nItemsInLastRun;
132 TPIE_OS_SIZE_T nItemsInThisRun;
134 TPIE_OS_OFFSET runsInStream;
140 std::string working_disk;
147 template <
class T,
class I,
class M>
149 m_internalSorter(isort),
162 curOutputRunStream(NULL),
173 template<
class T,
class I,
class M>
176 m_indicator = indicator;
180 use2xSpace = (in == out);
186 if (in==NULL || out==NULL) {
187 if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
191 if (inStream->size() < 2) {
192 if (m_indicator) {m_indicator->init(1); m_indicator->step(); m_indicator->done();}
206 template<
class T,
class I,
class M>
208 sort(in, in, indicator);
211 template<
class T,
class I,
class M>
228 mmBytesAvail -= 2 * mmBytesPerStream;
232 nInputItems = inStream->size();
236 if (nInputItems < TPIE_OS_OFFSET(m_internalSorter->MaxItemCount(mmBytesAvail))) {
239 fp.id() << __FILE__ << __FUNCTION__ <<
"internal_sort" <<
typeid(T) <<
typeid(I) <<
typeid(M);
243 allocate_progress.init(nInputItems);
244 m_internalSorter->allocate(static_cast<TPIE_OS_SIZE_T>(nInputItems));
245 allocate_progress.done();
251 m_internalSorter->sort(inStream,
253 static_cast<TPIE_OS_SIZE_T>(nInputItems),
256 m_internalSorter->deallocate();
266 compute_sort_params();
283 fractional_progress fp(m_indicator);
284 fp.id() << __FILE__ << __FUNCTION__ <<
"external_sort" <<
typeid(T) <<
typeid(I) <<
typeid(M);
285 fractional_subindicator run_progress(fp,
"run",
TPIE_FSI, nInputItems,
"",tpie::IMPORTANCE_LOG);
286 fractional_subindicator merge_progress(fp,
"merge",
TPIE_FSI, nInputItems,
"",tpie::IMPORTANCE_LOG);
292 TP_LOG_DEBUG_ID (
"Beginning general merge sort.");
293 partition_and_sort_runs(&run_progress, temporaries);
295 merge_to_output(&merge_progress, temporaries);
300 template<
class T,
class I,
class M>
301 void sort_manager<T,I,M>::compute_sort_params(
void){
341 TP_LOG_DEBUG_ID (
"Computing merge sort parameters.");
343 TPIE_OS_SIZE_T mmBytesAvailSort;
345 TP_LOG_DEBUG (
"Each object of size " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(
sizeof(T)) <<
" uses "
346 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(m_internalSorter->space_per_item ()) <<
" bytes "
347 <<
"for sorting in memory\n");
352 mmBytesAvailSort=mmBytesAvail - mmBytesPerStream;
354 nItemsPerRun=m_internalSorter->MaxItemCount(mmBytesAvailSort);
357 throw stream_exception(
"Insufficient Memory for forming sorted runs");
364 TPIE_OS_SIZE_T mmBytesPerMergeItem = mmBytesPerStream +
365 m_mergeHeap->space_per_item() +
sizeof(T*) +
366 sizeof(TPIE_OS_OFFSET)+
sizeof(ami::stream<T>*);
371 TPIE_OS_SIZE_T mmBytesFixedForMerge = m_mergeHeap->space_overhead() +
374 if (mmBytesFixedForMerge > mmBytesAvail) {
375 throw stream_exception(
"Insufficient memory for merge heap and output stream");
383 mrgArity =
static_cast<arity_t>(mmBytesAvail-mmBytesFixedForMerge) /
385 TP_LOG_DEBUG(
"mem avail=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesAvail-mmBytesFixedForMerge)
386 <<
" bytes per merge item=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mmBytesPerMergeItem)
387 <<
" initial mrgArity=" << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) <<
"\n");
391 throw stream_exception(
"Merge arity < 2 -- Insufficient memory for a merge.");
404 if (availableStreams < 3) {
405 throw stream_exception(
"Not enough stream descriptors available to perform merge.");
411 if (mrgArity > static_cast<arity_t>(availableStreams - 1)) {
413 mrgArity =
static_cast<arity_t>(availableStreams - 1);
415 TP_LOG_DEBUG_ID (
"Reduced merge arity due to AMI restrictions.");
420 nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
422 #ifdef TPIE_SORT_SMALL_MRGARITY
427 TP_LOG_WARNING_ID(
"TPIE_SORT_SMALL_MRGARITY flag is set."
428 " Did you mean to do this?");
429 if(mrgArity > TPIE_SORT_SMALL_MRGARITY) {
430 TP_LOG_WARNING_ID(
"Reducing merge arity due to compiler specified flag");
431 mrgArity=TPIE_SORT_SMALL_MRGARITY;
433 #endif // TPIE_SORT_SMALL_MRGARITY
435 #ifdef TPIE_SORT_SMALL_RUNSIZE
440 TP_LOG_WARNING_ID(
"TPIE_SORT_SMALL_RUNSIZE flag is set."
441 " Did you mean to do this?");
442 if(nItemsPerRun > TPIE_SORT_SMALL_RUNSIZE) {
443 TP_LOG_WARNING_ID(
"Reducing run size due to compiler specified flag");
444 nItemsPerRun=TPIE_SORT_SMALL_RUNSIZE;
448 nRuns = ((nInputItems + nItemsPerRun - 1) / nItemsPerRun);
449 #endif // TPIE_SORT_SMALL_RUNSIZE
452 #ifdef MINIMIZE_INITIAL_RUN_LENGTH
458 TP_LOG_DEBUG_ID (
"Minimizing initial run lengths without increasing" <<
459 " the height of the merge tree.");
463 double tree_height = log((
double)nRuns) / log((
double)mrgArity);
464 tp_assert (tree_height > 0,
"Negative or zero tree height!");
465 tree_height = ceil (tree_height);
469 double maxOrigRuns = pow ((
double) mrgArity, tree_height);
470 tp_assert (maxOrigRuns >= nRuns
"Number of permitted runs was reduced!");
473 double new_nItemsPerRun = ceil (nInputItems/ maxOrigRuns);
474 tp_assert (new_nItemsPerRun <= nItemsPerRun,
475 "Size of original runs increased!");
478 nItemsPerRun = (TPIE_OS_SIZE_T) new_nItemsPerRun;
480 TP_LOG_DEBUG_ID (
"With long internal memory runs, nRuns = "
483 nRuns = (nInputItems + nItemsPerRun - 1) / nItemsPerRun;
485 TP_LOG_DEBUG_ID (
"With shorter internal memory runs "
486 <<
"and the same merge tree height, nRuns = "
490 "We increased the merge height when we weren't supposed to do so!");
491 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
497 if(static_cast<TPIE_OS_OFFSET>(mrgArity)>nRuns){
500 mrgArity=
static_cast<TPIE_OS_SIZE_T
>(nRuns);
505 tp_assert (nRuns > 1,
"Less than two runs to merge!");
507 tp_assert (nRuns * nItemsPerRun - nInputItems < nItemsPerRun,
508 "Total expected output size is too large.");
509 tp_assert (nInputItems - (nRuns - 1) * nItemsPerRun <= nItemsPerRun,
510 "Total expected output size is too small.");
512 TP_LOG_DEBUG_ID (
"Input stream has " << nInputItems <<
" items");
513 TP_LOG_DEBUG (
"Max number of items per runs " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nItemsPerRun) );
514 TP_LOG_DEBUG (
"\nInitial number of runs " << nRuns );
515 TP_LOG_DEBUG (
"\nMerge arity is " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) <<
"\n" );
518 template<
class T,
class I,
class M>
519 void sort_manager<T,I,M>::partition_and_sort_runs(progress_indicator_base* indicator,
tpie::array<temp_file> & temporaries){
532 minRunsPerStream = nRuns/mrgArity;
536 nXtraRuns =
static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
537 tp_assert(nXtraRuns<mrgArity,
"Too many extra runs");
541 nItemsInLastRun =
static_cast<TPIE_OS_SIZE_T
>(nInputItems % nItemsPerRun);
542 if(nItemsInLastRun==0){
544 nItemsInLastRun=nItemsPerRun;
550 m_internalSorter->allocate(nItemsPerRun);
552 TP_LOG_DEBUG_ID (
"Partitioning and forming sorted runs.");
555 nItemsInThisRun=nItemsPerRun;
563 TPIE_OS_OFFSET check_size = 0;
566 indicator->init(nRuns*1000);
568 for(
arity_t ii=0; ii<mrgArity; ii++){
571 curOutputRunStream = tpie_new<file_stream<T> >();
572 curOutputRunStream->open(temporaries[ii],
access_write);
577 runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
579 for(TPIE_OS_OFFSET jj=0; jj < runsInStream; jj++ ) {
581 if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
582 nItemsInThisRun=nItemsInLastRun;
585 progress_indicator_subindicator sort_indicator(indicator, 1000);
586 m_internalSorter->sort(inStream, curOutputRunStream,
587 nItemsInThisRun, &sort_indicator);
591 TP_LOG_DEBUG_ID (
"Wrote " << runsInStream <<
" runs and "
592 << curOutputRunStream->size() <<
" items to file "
593 <<
static_cast<TPIE_OS_OUTPUT_SIZE_T
>(ii));
594 check_size+=curOutputRunStream->size();
598 tp_assert(check_size == nInputItems,
"item count mismatch");
602 m_internalSorter->deallocate();
605 inStream->truncate(0);
608 if (indicator) indicator->done();
611 template<
class T,
class I,
class M>
612 void sort_manager<T,I,M>::merge_to_output(progress_indicator_base* indicator,
tpie::array<temp_file> & temporaries){
626 TP_LOG_DEBUG_ID(
"Allocated " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(
sizeof(ami::stream<T>*)*mrgArity)
627 <<
" bytes for " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(mrgArity) <<
" merge input stream pointers.\n"
640 m_mergeHeap->allocate( mrgArity );
654 treeHeight=
static_cast<int>(ceil(log(static_cast<float>(nRuns)) /
655 log(static_cast<float>(mrgArity))));
657 indicator->set_range( nInputItems * treeHeight);
665 while (nRuns > TPIE_OS_OFFSET(mrgArity)) {
677 TP_LOG_DEBUG (
"Intermediate merge. level="<<mrgHeight <<
"\n");
680 nRuns = (nRuns + mrgArity - 1)/mrgArity;
684 minRunsPerStream = nRuns/mrgArity;
689 arity_t mergeRunsInLastOutputRun=(nXtraRuns>0) ? nXtraRuns : mrgArity;
694 nXtraRuns =
static_cast<arity_t>(nRuns - minRunsPerStream*mrgArity);
695 tp_assert(nXtraRuns<mrgArity,
"Too many extra runs");
698 arity_t nOutputStreams = (minRunsPerStream > 0) ? mrgArity : nXtraRuns;
700 arity_t nRunsToMerge = mrgArity;
703 for(ii = 0; ii < mrgArity; ii++){
707 file_stream<T> * stream = tpie_new<file_stream<T> >();
708 mergeInputStreams[ii].reset(stream);
709 stream->open(temporaries[mrgArity*(mrgHeight%2)+ii],
access_read);
713 TPIE_OS_OFFSET check_size=0;
719 TP_LOG_DEBUG(
"Writing " << nRuns <<
" runs to " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nOutputStreams)
720 <<
" output files.\nEach output file has at least "
721 << minRunsPerStream <<
" runs.\n");
723 for(ii = mrgArity-nOutputStreams; ii < mrgArity; ii++){
727 file_stream<T> curOutputRunStream;
728 curOutputRunStream.open(temporaries[mrgArity*((mrgHeight+1)%2)+ii],
access_write);
733 runsInStream = minRunsPerStream + ((ii >= mrgArity-nXtraRuns)?1:0);
734 TP_LOG_DEBUG(
"Writing " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) <<
" runs to output "
735 <<
" file " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) <<
"\n");
736 for( jj=0; jj < runsInStream; jj++ ) {
738 if( (ii==mrgArity-1) && (jj==runsInStream-1)) {
739 nRunsToMerge=mergeRunsInLastOutputRun;
742 single_merge(mergeInputStreams.find(mrgArity-nRunsToMerge),
743 mergeInputStreams.find(mrgArity),
745 nItemsPerRun, indicator);
749 TP_LOG_DEBUG(
"Wrote " << static_cast<TPIE_OS_OUTPUT_SIZE_T>(runsInStream) <<
" runs and "
750 << curOutputRunStream.size() <<
" items "
751 <<
"to file " <<
static_cast<TPIE_OS_OUTPUT_SIZE_T
>(ii) <<
"\n");
752 check_size+=curOutputRunStream.size();
755 tp_assert(check_size==nInputItems,
"item count mismatch in merge");
760 for(ii = 0; ii < mrgArity; ii++) {
761 mergeInputStreams[ii].reset();
762 temporaries[mrgArity*(mrgHeight%2)+ii].free();
765 nItemsPerRun=mrgArity*nItemsPerRun;
769 tp_assert( nRuns > 1,
"Not enough runs to merge to final output");
770 tp_assert( nRuns <= TPIE_OS_OFFSET(mrgArity),
"Too many runs to merge to final output");
776 TP_LOG_DEBUG_ID (
"Final merge. level="<<mrgHeight);
777 TP_LOG_DEBUG(
"Merge runs left="<<nRuns<<
"\n");
778 for(ii = mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns); ii < mrgArity; ii++){
785 TP_LOG_DEBUG (
"Putting merge stream "<< static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii) <<
" in slot "
786 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(ii-(mrgArity-static_cast<TPIE_OS_SIZE_T>(nRuns))) <<
"\n");
787 file_stream<T> * stream = tpie_new<file_stream<T> >();
788 mergeInputStreams[ii-(mrgArity-
static_cast<TPIE_OS_SIZE_T
>(nRuns))].reset(stream);
789 stream->open(temporaries[mrgArity*(mrgHeight%2)+ii],
access_read);
797 mergeInputStreams.find((
size_t)nRuns),
798 outStream, -1, indicator);
800 if (indicator) indicator->done();
801 tp_assert((TPIE_OS_OFFSET)outStream->size() == nInputItems,
"item count mismatch");
803 TP_LOG_DEBUG(
"merge cleanup\n");
806 mergeInputStreams.resize(0);
809 m_mergeHeap->deallocate();
810 TP_LOG_DEBUG_ID (
"Number of passes incl run formation is " <<
812 TP_LOG_DEBUG(
"AMI_partition_and_merge END\n");
815 template<
class T,
class I,
class M>
819 file_stream < T >*outStream, TPIE_OS_OFFSET cutoff, progress_indicator_base* indicator)
829 #endif // _TPIE_AMI_SORT_MANAGER_H
Defines the tp_assert macro.
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
The base class for indicating the progress of some task.
static std::string tpie_name(const std::string &post_base="", const std::string &dir="", const std::string &ext="")
Generate path for a new temporary file.
A generic array with a fixed size.
merge_sorted_runs as used in several of TPIE's merge variants
#define TPIE_FSI
For use when constructing a fractional subindicator.
const item_type & read()
Read an item from the stream.
Fractional progress reporter.
This file contains a few deprecated definitions for legacy code.
void write(const item_type &item)
Write an item to the stream.
void merge_sorted_runs(typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator start, typename tpie::array< tpie::auto_ptr< file_stream< T > > >::iterator end, file_stream< T > *outStream, M *MergeHeap, TPIE_OS_OFFSET cutoff=-1, progress_indicator_base *indicator=NULL)
This is a common merge routine for all of the AMI_merge_sorted, AMI_ptr_merge_sorted and AMI_key_merg...
err single_merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams in memory using a merge management object and write result into outstream...
void sort(file_stream< T > *in, file_stream< T > *out, progress_indicator_base *indicator=NULL)
Sort in stream to out stream an save in stream (uses 3x space)
Generic internal array with known memory requirements.
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
void seek(stream_offset_type offset, offset_type whence=beginning)
Moves the logical offset in the stream.
Open a file for writing only, content is truncated.
memory_size_type available_files()
Return the additional number of files that can be opened before running out of file descriptors...
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Merge management objects.
Simple class acting both as file and a file::stream.
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
like std::auto_ptr, but delete the object with tpie_delete.
#define tp_assert(condition, message)
A class of manager objects for merge sorting objects of type T.
void sort(file_stream< T > &instream, file_stream< T > &outstream, Compare comp, progress_indicator_base &indicator)
Sort elements of a stream using the given STL-style comparator object.
stream_size_type size() const
Get the size of the file measured in items.
Subindicator for fractional progress reporting.