20 #ifndef __TPIE_PIPELINING_MERGE_SORTER_H__
21 #define __TPIE_PIPELINING_MERGE_SORTER_H__
23 #include <tpie/pipelining/sort_parameters.h>
24 #include <tpie/pipelining/merger.h>
25 #include <tpie/pipelining/exception.h>
43 template <
typename T,
bool UseProgress,
typename pred_t = std::less<T> >
46 typedef boost::shared_ptr<merge_sorter> ptr;
49 static const memory_size_type maximumFanout = 250;
52 : m_state(stParameters)
54 , m_parametersSet(
false)
58 , m_finalMergeInitialized(
false)
66 inline void set_parameters(stream_size_type runLength, memory_size_type fanout) {
67 tp_assert(m_state == stParameters,
"Merge sorting already begun");
70 m_parametersSet =
true;
71 log_debug() <<
"Manually set merge sort run length and fanout\n";
73 log_debug() <<
"Fanout = " << p.
fanout <<
" (uses memory " << fanout_memory_usage(p.
fanout) <<
")" << std::endl;
81 calculate_parameters(m, m, m);
91 calculate_parameters(m1, m2, m3);
96 inline void maybe_calculate_parameters() {
106 inline void set_phase_1_memory(memory_size_type m1) {
108 maybe_calculate_parameters();
111 inline void set_phase_2_memory(memory_size_type m2) {
113 maybe_calculate_parameters();
116 inline void set_phase_3_memory(memory_size_type m3) {
118 maybe_calculate_parameters();
125 tp_assert(m_state == stParameters,
"Merge sorting already begun");
127 log_debug() <<
"Start forming input runs" << std::endl;
129 m_runFiles.resize(p.
fanout*2);
130 m_currentRunItemCount = 0;
132 m_state = stRunFormation;
139 inline void push(
const T & item) {
140 tp_assert(m_state == stRunFormation,
"Wrong phase");
141 if (m_currentRunItemCount >= p.
runLength) {
145 m_currentRunItems[m_currentRunItemCount] = item;
146 ++m_currentRunItemCount;
154 tp_assert(m_state == stRunFormation,
"Wrong phase");
157 if (m_itemCount == 0) {
158 tp_assert(m_currentRunItemCount == 0,
"m_itemCount == 0, but m_currentRunItemCount != 0");
159 m_reportInternal =
true;
161 m_currentRunItems.
resize(0);
162 log_debug() <<
"Got no items. Internal reporting mode." << std::endl;
165 m_reportInternal =
true;
167 log_debug() <<
"Got " << m_currentRunItemCount <<
" items. Internal reporting mode." << std::endl;
169 }
else if (m_finishedRuns == 0
177 m_currentRunItems.
swap(currentRun);
179 m_reportInternal =
true;
181 log_debug() <<
"Got " << m_currentRunItemCount <<
" items. Internal reporting mode "
182 <<
"after resizing item buffer." << std::endl;
185 m_reportInternal =
false;
187 m_currentRunItems.
resize(0);
188 log_debug() <<
"Got " << m_finishedRuns <<
" runs. External reporting mode." << std::endl;
198 tp_assert(m_state == stMerge,
"Wrong phase");
199 if (!m_reportInternal) {
209 inline void evacuate() {
210 tp_assert(m_state == stMerge || m_state == stReport,
"Wrong phase");
211 if (m_reportInternal) {
212 log_debug() <<
"Evacuate merge_sorter (" <<
this <<
") in internal reporting mode" << std::endl;
213 m_reportInternal =
false;
214 memory_size_type runCount = (m_currentRunItemCount > 0) ? 1 : 0;
216 m_currentRunItems.
resize(0);
217 initialize_final_merger(0, runCount);
218 }
else if (m_state == stMerge) {
219 log_debug() <<
"Evacuate merge_sorter (" <<
this <<
") before merge in external reporting mode (noop)" << std::endl;
222 log_debug() <<
"Evacuate merge_sorter (" <<
this <<
") before reporting in external reporting mode" << std::endl;
227 inline void evacuate_before_merging() {
228 if (m_state == stMerge) evacuate();
231 inline void evacuate_before_reporting() {
232 if (m_state == stReport && (!m_reportInternal || m_itemsPulled == 0)) evacuate();
240 inline void sort_current_run() {
245 inline void empty_current_run() {
246 if (m_finishedRuns < 10)
247 log_debug() <<
"Write " << m_currentRunItemCount <<
" items to run file " << m_finishedRuns << std::endl;
248 else if (m_finishedRuns == 10)
251 open_run_file_write(fs, 0, m_finishedRuns);
252 for (memory_size_type i = 0; i < m_currentRunItemCount; ++i) {
253 fs.write(m_currentRunItems[i]);
255 m_currentRunItemCount = 0;
263 inline void initialize_merger(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount) {
268 array<file_stream<T> > in(runCount);
269 for (memory_size_type i = 0; i < runCount; ++i) {
270 open_run_file_read(in[i], mergeLevel, runNumber+i);
272 stream_size_type runLength = calculate_run_length(p.
runLength, p.
fanout, mergeLevel);
274 m_merger.reset(in, runLength);
280 inline void initialize_final_merger(memory_size_type finalMergeLevel, memory_size_type runCount) {
281 if (m_finalMergeInitialized) {
282 reinitialize_final_merger();
286 m_finalMergeInitialized =
true;
287 m_finalMergeLevel = finalMergeLevel;
288 m_finalRunCount = runCount;
290 log_debug() <<
"Run count in final level (" << runCount <<
") is greater than the final fanout (" << p.
finalFanout <<
")\n";
293 memory_size_type n = runCount-i;
294 log_debug() <<
"Merge " << n <<
" runs starting from #" << i << std::endl;
295 dummy_progress_indicator pi;
296 m_finalMergeSpecialRunNumber = merge_runs(finalMergeLevel, i, n, pi);
298 log_debug() <<
"Run count in final level (" << runCount <<
") is less or equal to the final fanout (" << p.
finalFanout <<
")" << std::endl;
299 m_finalMergeSpecialRunNumber = std::numeric_limits<memory_size_type>::max();
301 reinitialize_final_merger();
305 inline void reinitialize_final_merger() {
306 tp_assert(m_finalMergeInitialized,
"reinitialize_final_merger while !m_finalMergeInitialized");
307 if (m_finalMergeSpecialRunNumber != std::numeric_limits<memory_size_type>::max()) {
309 for (memory_size_type i = 0; i < p.
finalFanout-1; ++i) {
310 open_run_file_read(in[i], m_finalMergeLevel, i);
311 log_debug() <<
"Run " << i <<
" is at offset " << in[i].offset() <<
" and has size " << in[i].size() << std::endl;
313 open_run_file_read(in[p.
finalFanout-1], m_finalMergeLevel+1, m_finalMergeSpecialRunNumber);
315 stream_size_type runLength = calculate_run_length(p.
runLength, p.
fanout, m_finalMergeLevel+1);
316 log_debug() <<
"Run length " << runLength << std::endl;
317 m_merger.reset(in, runLength);
319 initialize_merger(m_finalMergeLevel, 0, m_finalRunCount);
328 static inline stream_size_type calculate_run_length(stream_size_type initialRunLength, memory_size_type fanout, memory_size_type mergeLevel) {
329 stream_size_type runLength = initialRunLength;
330 for (memory_size_type i = 0; i < mergeLevel; ++i) {
341 template <
typename ProgressIndicator>
342 inline memory_size_type merge_runs(memory_size_type mergeLevel, memory_size_type runNumber, memory_size_type runCount, ProgressIndicator & pi) {
343 initialize_merger(mergeLevel, runNumber, runCount);
345 memory_size_type nextRunNumber = runNumber/p.
fanout;
346 open_run_file_write(out, mergeLevel+1, nextRunNumber);
347 while (m_merger.can_pull()) {
349 out.write(m_merger.pull());
351 return nextRunNumber;
357 inline void prepare_pull(
typename Progress::base & pi) {
359 int treeHeight=
static_cast<int>(ceil(log(static_cast<float>(m_finishedRuns)) /
360 log(static_cast<float>(p.
fanout))));
361 pi.init(item_count()*treeHeight);
363 memory_size_type mergeLevel = 0;
364 memory_size_type runCount = m_finishedRuns;
365 while (runCount > p.
fanout) {
366 log_debug() <<
"Merge " << runCount <<
" runs in merge level " << mergeLevel <<
'\n';
367 memory_size_type newRunCount = 0;
368 for (memory_size_type i = 0; i < runCount; i += p.
fanout) {
369 memory_size_type n = std::min(runCount-i, p.
fanout);
371 if (newRunCount < 10)
372 log_debug() <<
"Merge " << n <<
" runs starting from #" << i << std::endl;
373 else if (newRunCount == 10)
376 merge_runs(mergeLevel, i, n, pi);
380 runCount = newRunCount;
382 log_debug() <<
"Final merge level " << mergeLevel <<
" has " << runCount <<
" runs" << std::endl;
383 initialize_final_merger(mergeLevel, runCount);
395 tp_assert(m_state == stReport,
"Wrong phase");
396 if (m_reportInternal)
return m_itemsPulled < m_currentRunItemCount;
398 if (m_evacuated) reinitialize_final_merger();
399 return m_merger.can_pull();
407 tp_assert(m_state == stReport,
"Wrong phase");
408 if (m_reportInternal && m_itemsPulled < m_currentRunItemCount) {
409 T el = m_currentRunItems[m_itemsPulled++];
413 if (m_evacuated) reinitialize_final_merger();
414 return m_merger.pull();
418 inline stream_size_type item_count() {
422 static memory_size_type memory_usage_phase_1(
const sort_parameters & params) {
423 return params.runLength *
sizeof(T)
424 + file_stream<T>::memory_usage()
425 + 2*params.fanout*
sizeof(temp_file);
428 static memory_size_type minimum_memory_phase_1() {
436 sort_parameters p((sort_parameters()));
438 p.
fanout = calculate_fanout(std::numeric_limits<memory_size_type>::max());
439 return memory_usage_phase_1(p);
442 static memory_size_type memory_usage_phase_2(
const sort_parameters & params) {
443 return fanout_memory_usage(params.fanout);
446 static memory_size_type minimum_memory_phase_2() {
447 return fanout_memory_usage(calculate_fanout(0));
450 static memory_size_type memory_usage_phase_3(
const sort_parameters & params) {
451 return fanout_memory_usage(params.finalFanout);
454 static memory_size_type minimum_memory_phase_3() {
455 return fanout_memory_usage(calculate_fanout(0));
458 static memory_size_type maximum_memory_phase_3() {
459 return fanout_memory_usage(maximumFanout);
462 inline memory_size_type evacuated_memory_usage()
const {
463 return 2*p.
fanout*
sizeof(temp_file);
473 inline void calculate_parameters(
const memory_size_type m1,
const memory_size_type m2,
const memory_size_type m3) {
474 tp_assert(m_state == stParameters,
"Merge sorting already begun");
512 memory_size_type tempFileMemory = 2*p.
fanout*
sizeof(temp_file);
514 log_debug() <<
"Phase 1: " << p.
memoryPhase1 <<
" b available memory; " << streamMemory <<
" b for a single stream; " << tempFileMemory <<
" b for temp_files\n";
515 memory_size_type min_m1 =
sizeof(T) + streamMemory + tempFileMemory;
517 log_warning() <<
"Not enough phase 1 memory for an item and an open stream! (" << p.
memoryPhase1 <<
" < " << min_m1 <<
")\n";
525 - tempFileMemory)/
sizeof(T);
529 m_parametersSet =
true;
531 log_debug() <<
"Calculated merge sort parameters\n";
536 << m1 <<
" b available, " << memory_usage_phase_1(p) <<
" b expected" << std::endl;
537 if (memory_usage_phase_1(p) > m1) {
538 log_warning() <<
"Merge sort phase 1 exceeds the alloted memory usage: "
539 << m1 <<
" b available, but " << memory_usage_phase_1(p) <<
" b expected" << std::endl;
542 << m2 <<
" b available, " << memory_usage_phase_2(p) <<
" b expected" << std::endl;
543 if (memory_usage_phase_2(p) > m2) {
544 log_warning() <<
"Merge sort phase 2 exceeds the alloted memory usage: "
545 << m2 <<
" b available, but " << memory_usage_phase_2(p) <<
" b expected" << std::endl;
548 << m3 <<
" b available, " << memory_usage_phase_3(p) <<
" b expected" << std::endl;
549 if (memory_usage_phase_3(p) > m3) {
550 log_warning() <<
"Merge sort phase 3 exceeds the alloted memory usage: "
551 << m3 <<
" b available, but " << memory_usage_phase_3(p) <<
" b expected" << std::endl;
558 static inline memory_size_type calculate_fanout(memory_size_type availableMemory) {
559 memory_size_type fanout_lo = 2;
560 memory_size_type fanout_hi = maximumFanout + 1;
562 while (fanout_lo < fanout_hi - 1) {
563 memory_size_type mid = fanout_lo + (fanout_hi-fanout_lo)/2;
564 if (fanout_memory_usage(mid) <= availableMemory) {
576 static inline memory_size_type fanout_memory_usage(memory_size_type fanout) {
577 return merger<T, pred_t>::memory_usage(fanout)
579 + 2*
sizeof(temp_file);
592 if (!m_parametersSet)
593 throw exception(
"Wrong state in set_items: parameters not set");
594 if (m_state != stParameters)
595 throw exception(
"Wrong state in set_items: state is not stParameters");
599 <<
" to " << n << std::endl;
607 log_debug() <<
"New merge sort parameters\n";
619 inline memory_size_type run_file_index(memory_size_type mergeLevel, memory_size_type runNumber) {
623 return (mergeLevel % 2)*p.
fanout + (runNumber % p.
fanout);
629 inline void open_run_file_write(file_stream<T> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
632 memory_size_type idx = run_file_index(mergeLevel, runNumber);
633 if (runNumber < p.
fanout) m_runFiles[idx].free();
635 fs.seek(0, file_stream<T>::end);
641 inline void open_run_file_read(file_stream<T> & fs, memory_size_type mergeLevel, memory_size_type runNumber) {
644 memory_size_type idx = run_file_index(mergeLevel, runNumber);
646 fs.seek(calculate_run_length(p.
runLength, p.
fanout, mergeLevel) * (runNumber / p.
fanout), file_stream<T>::beginning);
659 bool m_parametersSet;
661 merger<T, pred_t> m_merger;
663 array<temp_file> m_runFiles;
666 stream_size_type m_finishedRuns;
669 array<T> m_currentRunItems;
673 memory_size_type m_currentRunItemCount;
675 bool m_reportInternal;
679 memory_size_type m_itemsPulled;
681 stream_size_type m_itemCount;
686 bool m_finalMergeInitialized;
687 memory_size_type m_finalMergeLevel;
688 memory_size_type m_finalRunCount;
689 memory_size_type m_finalMergeSpecialRunNumber;
694 #endif // __TPIE_PIPELINING_MERGE_SORTER_H__
Encapsulation of two pointers from any random access container.
stream_size_type internalReportThreshold
Maximum item count for internal reporting, subject to memory restrictions in all phases.
The base class for indicating the progress of some task.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
A generic array with a fixed size.
Merge sorting consists of three phases.
void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3)
Calculate parameters from given memory amount.
virtual void done()
Advance the indicator to the end.
void set_items(stream_size_type n)
Set upper bound on number of items pushed.
void set_available_memory(memory_size_type m)
Calculate parameters from given memory amount.
memory_size_type memoryPhase3
Memory available during output phase.
void set_parameters(stream_size_type runLength, memory_size_type fanout)
Enable setting run length and fanout manually (for testing purposes).
Encapsulation of two pointers from any random access container.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
memory_size_type finalFanout
Fanout of merge tree during phase 4.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
logstream & log_debug()
Return logstream for writing debug log messages.
void calc(typename Progress::base &pi)
Perform phase 2: Performing all merges in the merge tree except the last one.
memory_size_type memoryPhase2
Memory available while merging runs.
void resize(size_t size, const T &elm)
Change the size of the array.
static memory_size_type memory_usage(float blockFactor=1.0, bool includeDefaultFileAccessor=true)
Calculate the amount of memory used by a single file_stream.
Simple class acting both as file and a file::stream.
void swap(array &other)
Swap two arrays.
iterator begin()
Return an iterator to the beginning of the array.
size_type size() const
Return the size of the array.
void begin()
Initiate phase 1: Formation of input runs.
bool can_pull()
In phase 3, return true if there are more items in the final merge phase.
stream_size_type runLength
Run length, subject to memory restrictions during phase 2.
#define tp_assert(condition, message)
For applications where you wish to disable progress indicators via a template parameter, refer to progress_types members names sub, fp and base.
T pull()
In phase 3, fetch next item in the final merge phase.
logstream & log_warning()
Return logstream for writing warning log messages.
memory_size_type fanout
Fanout of merge tree during phase 3.
Progress indicator concept in an efficient non-inheritance way.
void push(const T &item)
Push item to merge sorter during phase 1.
Open a file for reading or writing.
virtual void init(stream_size_type range=0)
Initialize progress indicator.
memory_size_type memoryPhase1
Memory available while forming sorted runs.