20 #ifndef TPIE_SERIALIZATION_SORTER_H
21 #define TPIE_SERIALIZATION_SORTER_H
24 #include <boost/filesystem.hpp>
36 #include <tpie/pipelining/node.h>
40 namespace serialization_bits {
60 void dump(std::ostream & out)
const {
61 out <<
"Serialization merge sort parameters\n"
69 <<
"Temporary directory: " <<
tempDir <<
'\n';
77 if (serSize >
sizeof(T)) {
86 void unset_owner(memory_bucket_ref , T & ) {}
88 template <
typename T,
typename pred_t>
91 memory_size_type m_items;
92 memory_size_type m_memForItems;
94 memory_size_type m_largestItem;
106 pred_t pred = pred_t())
107 : m_buffer(buffer_bucket)
109 , m_largestItem(
sizeof(T))
112 , m_buffer_bucket(buffer_bucket)
113 , m_item_bucket(item_bucket)
117 void begin(memory_size_type memAvail) {
118 m_buffer.
resize(memAvail /
sizeof(T) / 2);
120 m_largestItem =
sizeof(T);
122 m_memForItems = memAvail - m_buffer_bucket->count;
132 if (m_full)
return false;
134 if (m_items == m_buffer.
size()) {
139 size_t oldSize = m_item_bucket->count;
140 set_owner(m_item_bucket, item);
142 if (m_item_bucket->count > m_memForItems) {
143 unset_owner(m_item_bucket, item);
148 m_largestItem = std::max(m_largestItem, m_item_bucket->count - oldSize);
150 m_buffer[m_items++] = item;
155 memory_size_type get_largest_item_size() {
156 return m_largestItem;
166 return m_item_bucket->count;
180 return m_buffer_bucket->count + m_item_bucket->count;
183 bool can_shrink_buffer() {
187 void shrink_buffer() {
189 m_buffer.
swap(newBuffer);
196 const T * begin()
const {
197 return m_buffer.
get();
200 const T * end()
const {
201 return m_buffer.
get() + m_items;
217 for (
size_t i = 0 ; i < m_items ; ++i)
218 unset_owner(m_item_bucket, m_buffer[i]);
219 m_item_bucket->count = 0;
264 template <
typename T>
269 size_t m_nextLevelFileOffset;
271 size_t m_nextFileOffset;
274 size_t m_readersOpen;
277 stream_size_type m_currentWriterByteSize;
281 std::string m_tempDir;
283 std::string run_file(
size_t physicalIndex) {
284 if (m_tempDir.size() == 0)
throw exception(
"run_file: temp dir is the empty string");
285 std::stringstream ss;
286 ss << m_tempDir <<
'/' << physicalIndex <<
".tpie";
293 , m_nextLevelFileOffset(0)
294 , m_nextFileOffset(0)
296 , m_writerOpen(
false)
300 , m_currentWriterByteSize(0)
308 void set_temp_dir(
const std::string & tempDir) {
309 if (m_nextFileOffset != 0)
310 throw exception(
"set_temp_dir: trying to change path after files already open");
314 void open_new_writer() {
315 if (m_writerOpen)
throw exception(
"open_new_writer: Writer already open");
316 m_writer.open(run_file(m_nextFileOffset++));
317 m_currentWriterByteSize = m_writer.file_size();
321 void write(
const T & v) {
322 if (!m_writerOpen)
throw exception(
"write: No writer open");
326 void close_writer() {
327 if (!m_writerOpen)
throw exception(
"close_writer: No writer open");
329 stream_size_type sz = m_writer.file_size();
330 increase_usage(m_nextFileOffset-1, static_cast<stream_offset_type>(sz));
331 m_writerOpen =
false;
334 size_t remaining_runs() {
335 return m_nextLevelFileOffset - m_fileOffset;
338 size_t next_level_runs() {
339 return m_nextFileOffset - m_nextLevelFileOffset;
342 bool readers_open() {
343 return m_readersOpen > 0;
346 void open_readers(
size_t fanout) {
347 if (m_readersOpen != 0)
throw exception(
"open_readers: readers already open");
348 if (fanout == 0)
throw exception(
"open_readers: fanout == 0");
349 if (remaining_runs() == 0) {
350 if (m_writerOpen)
throw exception(
"Writer open while moving to next merge level");
351 m_nextLevelFileOffset = m_nextFileOffset;
353 if (fanout > remaining_runs())
throw exception(
"open_readers: fanout out of bounds");
355 if (m_readers.size() < fanout) m_readers.resize(fanout);
356 for (
size_t i = 0; i < fanout; ++i) {
357 m_readers[i].open(run_file(m_fileOffset + i));
359 m_readersOpen = fanout;
362 bool can_read(
size_t idx) {
363 if (m_readersOpen == 0)
throw exception(
"can_read: no readers open");
364 if (m_readersOpen < idx)
throw exception(
"can_read: index out of bounds");
365 return m_readers[idx].can_read();
369 if (m_readersOpen == 0)
throw exception(
"read: no readers open");
370 if (m_readersOpen < idx)
throw exception(
"read: index out of bounds");
372 m_readers[idx].unserialize(res);
376 void close_readers_and_delete() {
377 if (m_readersOpen == 0)
throw exception(
"close_readers_and_delete: no readers open");
379 for (
size_t i = 0; i < m_readersOpen; ++i) {
380 decrease_usage(m_fileOffset + i, m_readers[i].file_size());
381 m_readers[i].close();
382 boost::filesystem::remove(run_file(m_fileOffset + i));
384 m_fileOffset += m_readersOpen;
388 void move_last_reader_to_next_level() {
389 if (remaining_runs() != 1)
390 throw exception(
"move_last_reader_to_next_level: remaining_runs != 1");
391 m_nextLevelFileOffset = m_fileOffset;
395 if (m_readersOpen > 0) {
396 log_debug() <<
"reset: Close readers" << std::endl;
397 close_readers_and_delete();
401 log_debug() <<
"reset: Close writer" << std::endl;
404 log_debug() <<
"Remove " << m_fileOffset <<
" through " << m_nextFileOffset << std::endl;
405 for (
size_t i = m_fileOffset; i < m_nextFileOffset; ++i) {
406 std::string runFile = run_file(i);
411 boost::filesystem::remove(runFile);
413 m_fileOffset = m_nextLevelFileOffset = m_nextFileOffset = 0;
417 void increase_usage(
size_t idx, stream_size_type sz) {
418 log_debug() <<
"+ " << idx <<
' ' << sz << std::endl;
422 void decrease_usage(
size_t idx, stream_size_type sz) {
423 log_debug() <<
"- " << idx <<
' ' << sz << std::endl;
428 template <
typename T,
typename pred_t>
434 typedef std::pair<T, size_t> item_type;
436 mergepred_t(
const pred_t & pred) : m_pred(pred) {}
439 bool operator()(
const item_type & a,
const item_type & b)
const {
440 return m_pred(b.first, a.first);
444 typedef typename mergepred_t::item_type item_type;
448 std::vector<serialization_reader> rd;
449 typedef std::priority_queue<item_type, std::vector<item_type>, mergepred_t> priority_queue_type;
450 priority_queue_type pq;
456 , pq(mergepred_t(pred))
461 void init(
size_t fanout) {
463 for (
size_t i = 0; i < fanout; ++i)
471 const T & top()
const {
472 return pq.top().first;
476 size_t idx = pq.top().second;
484 priority_queue_type tmp(pred);
491 void push_from(
size_t idx) {
492 if (files.can_read(idx)) {
493 pq.push(std::make_pair(files.read(idx), idx));
500 template <
typename T,
typename pred_t = std::less<T> >
503 typedef std::shared_ptr<serialization_sorter> ptr;
506 enum sorter_state { state_initial, state_1, state_2, state_3 };
508 std::unique_ptr<memory_bucket> m_buffer_bucket_ptr;
510 std::unique_ptr<memory_bucket> m_item_bucket_ptr;
514 sorter_state m_state;
517 bool m_parametersSet;
521 stream_size_type m_items;
522 bool m_reportInternal;
523 const T * m_nextInternalItem;
525 static const memory_size_type defaultFiles = 253;
526 static const memory_size_type minimumFilesPhase1 = 1;
527 static const memory_size_type maximumFilesPhase1 = 1;
528 static const memory_size_type minimumFilesPhase2 = 3;
529 static const memory_size_type maximumFilesPhase2 = std::numeric_limits<memory_size_type>::max();
530 static const memory_size_type minimumFilesPhase3 = 3;
531 static const memory_size_type maximumFilesPhase3 = std::numeric_limits<memory_size_type>::max();
533 const int defaultMaxFiles = 253;
541 , m_owning_node(
nullptr)
542 , m_state(state_initial)
543 , m_sorter(m_buffer_bucket, m_item_bucket, pred)
544 , m_parametersSet(
false)
546 , m_merger(m_files, pred)
548 , m_reportInternal(
false)
549 , m_nextInternalItem(0)
562 inline void check_not_started() {
563 if (m_state != state_initial) {
564 throw tpie::exception(
"Can't change parameters after sorting has started");
569 inline void set_phase_1_files(memory_size_type f1) {
574 inline void set_phase_2_files(memory_size_type f2) {
579 inline void set_phase_3_files(memory_size_type f3) {
606 void set_phase_1_memory(memory_size_type m1) {
611 void set_phase_2_memory(memory_size_type m2) {
616 void set_phase_3_memory(memory_size_type m3) {
621 void set_available_memory(memory_size_type m) {
622 set_phase_1_memory(m);
623 set_phase_2_memory(m);
624 set_phase_3_memory(m);
627 void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
628 set_phase_1_memory(m1);
629 set_phase_2_memory(m2);
630 set_phase_3_memory(m3);
633 static memory_size_type minimum_memory_phase_1() {
634 return serialization_writer::memory_usage()*2;
637 static memory_size_type minimum_memory_phase_2() {
638 return serialization_writer::memory_usage()
639 + 2*serialization_reader::memory_usage();
642 static memory_size_type minimum_memory_phase_3() {
643 return 2*serialization_reader::memory_usage();
646 memory_size_type actual_memory_phase_3() {
647 if (m_state != state_3)
649 if (m_reportInternal)
650 return m_sorter.memory_usage();
652 return m_files.next_level_runs() * (m_sorter.get_largest_item_size() + serialization_reader::memory_usage());
655 void set_owner(pipelining::node * n) {
656 if (m_owning_node !=
nullptr) {
657 m_buffer_bucket_ptr = std::move(m_owning_node->
bucket(0));
658 m_item_bucket_ptr = std::move(m_owning_node->
bucket(1));
662 n->bucket(0) = std::move(m_buffer_bucket_ptr);
663 n->bucket(1) = std::move(m_item_bucket_ptr);
669 static memory_size_type clamp(memory_size_type lo, memory_size_type val, memory_size_type hi) {
670 return std::max(lo, std::min(val, hi));
673 void calculate_parameters() {
674 if (m_state != state_initial)
678 m_params.
filesPhase1 = clamp(minimumFilesPhase1, defaultFiles, maximumFilesPhase1);
680 m_params.
filesPhase2 = clamp(minimumFilesPhase2, defaultFiles, maximumFilesPhase2);
682 m_params.
filesPhase3 = clamp(minimumFilesPhase3, defaultFiles, maximumFilesPhase3);
685 throw tpie::exception(
"file limit for phase 1 too small (" + std::to_string(m_params.
filesPhase1) +
" < " + std::to_string(minimumFilesPhase1) +
")");
687 throw tpie::exception(
"file limit for phase 2 too small (" + std::to_string(m_params.
filesPhase2) +
" < " + std::to_string(minimumFilesPhase2) +
")");
689 throw tpie::exception(
"file limit for phase 3 too small (" + std::to_string(m_params.
filesPhase3) +
" < " + std::to_string(minimumFilesPhase3) +
")");
692 if (memAvail1 <= serialization_writer::memory_usage()) {
693 log_error() <<
"Not enough memory for run formation; have " << memAvail1
694 <<
" bytes but " << serialization_writer::memory_usage()
695 <<
" is required for writing a run." << std::endl;
696 throw exception(
"Not enough memory for run formation");
702 if (memAvail2 <= serialization_writer::memory_usage()) {
703 log_error() <<
"Not enough memory for merging. "
704 <<
"mem avail = " << memAvail2
705 <<
", writer usage = " << serialization_writer::memory_usage()
707 throw exception(
"Not enough memory for merging.");
713 if (memAvail2 <= serialization_writer::memory_usage()) {
714 log_error() <<
"Not enough memory for outputting. "
715 <<
"mem avail = " << memAvail3
716 <<
", writer usage = " << serialization_writer::memory_usage()
718 throw exception(
"Not enough memory for outputting.");
721 memory_size_type memForMerge = std::min(memAvail2, memAvail3);
728 memory_size_type fanoutMemory = memForMerge - serialization_writer::memory_usage();
731 memory_size_type perFanout = m_params.
minimumItemSize + serialization_reader::memory_usage();
734 memory_size_type fanout = std::min(fanoutMemory / perFanout, m_params.
filesPhase2 - 1);
736 log_error() <<
"Not enough memory for merging, even when minimum item size is assumed. "
737 <<
"mem avail = " << memForMerge
738 <<
", fanout memory = " << fanoutMemory
739 <<
", per fanout >= " << perFanout
741 throw exception(
"Not enough memory for merging.");
745 m_files.set_temp_dir(m_params.
tempDir);
747 log_debug() <<
"Calculated serialization_sorter parameters.\n";
751 m_parametersSet =
true;
756 if (!m_parametersSet)
757 calculate_parameters();
758 if (m_state != state_initial)
762 log_debug() <<
"Before begin; mem usage = "
764 m_sorter.begin(m_params.
memoryPhase1 - serialization_writer::memory_usage());
765 log_debug() <<
"After internal sorter begin; mem usage = "
767 boost::filesystem::create_directory(m_params.
tempDir);
770 void push(
const T & item) {
771 if (m_state != state_1)
776 if (m_sorter.push(item))
return;
778 if (!m_sorter.push(item)) {
779 throw exception(
"Couldn't fit a single item in buffer");
784 if (m_state != state_1)
787 memory_size_type internalThreshold =
790 log_debug() <<
"m_sorter.memory_usage == " << m_sorter.memory_usage() <<
'\n'
791 <<
"internalThreshold == " << internalThreshold << std::endl;
794 m_reportInternal =
true;
795 m_nextInternalItem = 0;
797 log_debug() <<
"Got no items. Internal reporting mode." << std::endl;
798 }
else if (m_files.next_level_runs() == 0
799 && m_sorter.memory_usage()
800 <= internalThreshold) {
803 m_reportInternal =
true;
804 m_nextInternalItem = m_sorter.begin();
805 log_debug() <<
"Got " << m_sorter.current_serialized_size()
806 <<
" bytes of items. Internal reporting mode." << std::endl;
807 }
else if (m_files.next_level_runs() == 0
808 && m_sorter.current_serialized_size() <= internalThreshold
809 && m_sorter.can_shrink_buffer()) {
812 m_sorter.shrink_buffer();
813 m_reportInternal =
true;
814 m_nextInternalItem = m_sorter.begin();
815 log_debug() <<
"Got " << m_sorter.current_serialized_size()
816 <<
" bytes of items. Internal reporting mode after shrinking buffer." << std::endl;
821 log_debug() <<
"Got " << m_files.next_level_runs() <<
" runs. "
822 <<
"External reporting mode." << std::endl;
824 m_reportInternal =
false;
827 log_debug() <<
"After internal sorter end; mem usage = "
833 stream_size_type item_count() {
845 if (m_reportInternal) {
848 m_reportInternal =
false;
849 log_debug() <<
"Evacuate out of internal reporting mode." << std::endl;
851 log_debug() <<
"Evacuate in external reporting mode - noop." << std::endl;
857 memory_size_type evacuated_memory_usage()
const {
862 bool is_merge_runs_free() {
863 if (m_state != state_2)
865 if (m_reportInternal)
return true;
867 memory_size_type largestItem = m_sorter.get_largest_item_size();
868 memory_size_type fanoutMemory = m_params.
memoryPhase2 - serialization_writer::memory_usage();
869 memory_size_type perFanout = largestItem + serialization_reader::memory_usage();
870 memory_size_type fanout = std::min(m_params.
filesPhase2 - 1, fanoutMemory / perFanout);
872 memory_size_type finalFanoutMemory = m_params.
memoryPhase3;
873 memory_size_type finalFanout = std::min(
874 {m_params.
filesPhase3 - 1, fanout, finalFanoutMemory / perFanout});
876 return m_files.next_level_runs() <= finalFanout;
880 if (m_state != state_2)
883 if (m_reportInternal) {
884 log_debug() <<
"merge_runs: internal reporting; doing nothing." << std::endl;
889 memory_size_type largestItem = m_sorter.get_largest_item_size();
890 if (largestItem == 0) {
891 log_warning() <<
"Largest item is 0 bytes; doing nothing." << std::endl;
896 if (m_params.
memoryPhase2 <= serialization_writer::memory_usage())
897 throw exception(
"Not enough memory for merging.");
901 memory_size_type fanoutMemory = m_params.
memoryPhase2 - serialization_writer::memory_usage();
902 memory_size_type perFanout = largestItem + serialization_reader::memory_usage();
903 memory_size_type fanout = std::min(fanoutMemory / perFanout, m_params.
filesPhase2 - 1);
906 log_error() <<
"Not enough memory for merging. "
908 <<
", fanout memory = " << fanoutMemory
909 <<
", per fanout = " << perFanout
911 throw exception(
"Not enough memory for merging.");
914 memory_size_type finalFanoutMemory = m_params.
memoryPhase3;
915 memory_size_type finalFanout = std::min(
916 {m_params.
filesPhase3 - 1, fanout, finalFanoutMemory / perFanout});
918 if (finalFanout < 2) {
919 log_error() <<
"Not enough memory for merging (final fanout < 2). "
921 <<
", final fanout memory = " << finalFanoutMemory
922 <<
", per fanout = " << perFanout
924 throw exception(
"Not enough memory for merging.");
927 log_debug() <<
"Calculated merge phase parameters for serialization sort.\n"
928 <<
"Fanout: " << fanout <<
'\n'
929 <<
"Final fanout: " << finalFanout <<
'\n'
932 while (m_files.next_level_runs() > finalFanout) {
933 if (m_files.remaining_runs() != 0)
934 throw exception(
"m_files.remaining_runs() != 0");
935 log_debug() <<
"Runs in current level: " << m_files.next_level_runs() <<
'\n';
936 for (
size_t remainingRuns = m_files.next_level_runs(); remainingRuns > 0;) {
937 size_t f = std::min(fanout, remainingRuns);
940 if (remainingRuns != m_files.remaining_runs())
941 throw exception(
"remainingRuns != m_files.remaining_runs()");
951 if (m_sorter.begin() == m_sorter.end())
return;
952 m_files.open_new_writer();
953 for (
const T * item = m_sorter.begin(); item != m_sorter.end(); ++item) {
954 m_files.write(*item);
956 m_files.close_writer();
960 void initialize_merger(
size_t fanout) {
961 if (fanout == 0)
throw exception(
"initialize_merger: fanout == 0");
962 m_files.open_readers(fanout);
963 m_merger.init(fanout);
966 void free_merger_and_files() {
968 m_files.close_readers_and_delete();
971 void merge_runs(
size_t fanout) {
972 if (fanout == 0)
throw exception(
"merge_runs: fanout == 0");
974 if (fanout == 1 && m_files.remaining_runs() == 1) {
975 m_files.move_last_reader_to_next_level();
979 initialize_merger(fanout);
980 m_files.open_new_writer();
981 while (!m_merger.empty()) {
982 m_files.write(m_merger.top());
985 free_merger_and_files();
986 m_files.close_writer();
992 throw exception(
"pull: !can_pull");
994 if (m_reportInternal) {
995 T item = *m_nextInternalItem++;
996 if (m_nextInternalItem == m_sorter.end()) {
998 m_nextInternalItem = 0;
1003 if (!m_files.readers_open()) {
1004 if (m_files.next_level_runs() == 0)
1005 throw exception(
"pull: next_level_runs == 0");
1006 initialize_merger(m_files.next_level_runs());
1009 T item = m_merger.top();
1012 if (m_merger.empty()) {
1013 free_merger_and_files();
1021 if (m_reportInternal)
return m_nextInternalItem != 0;
1022 if (!m_files.readers_open())
return m_files.next_level_runs() > 0;
1023 return !m_merger.empty();
1029 #endif // TPIE_SERIALIZATION_SORTER_H
memory_size_type memoryPhase2
Memory available while merging runs.
size_t serialized_size(const T &v)
Given a serializable, serialize it and measure its serialized size.
Encapsulation of two pointers from any random access container.
T * get()
Return a raw pointer to the array content.
memory_size_type filesPhase3
files available during output phase.
Binary serialization and unserialization.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
memory_size_type memoryPhase1
memory available while forming sorted runs.
void set_available_files(memory_size_type f1, memory_size_type f2, memory_size_type f3)
Calculate parameters from given amount of files.
void reset()
Reset sorter, but keep the remembered largest item size and buffer size.
Stream of serializable items.
memory_size_type memoryPhase3
Memory available during output phase.
static std::string tpie_dir_name(const std::string &post_base="", const std::string &dir="")
Generate path for a new temporary directory.
Logging functionality and log_level codes for different priorities of log messages.
Generic internal array with known memory requirements.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
void free()
Deallocate buffer and call reset().
Simple parallel quick sort implementation with progress tracking.
memory_size_type filesPhase2
files available while merging runs.
Class storring a reference to a memory bucket.
stream_size_type file_size()
Size of file in bytes, including the header.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
Bucket used for memory counting.
logstream & log_debug()
Return logstream for writing debug log messages.
size_t available() const noexcept
Return the amount of the resource still available to be assigned.
File handling for merge sort.
size_t used() const noexcept
Return the current amount of the resource used.
void resize(size_t size, const T &elm)
Change the size of the array.
memory_size_type filesPhase1
files available while forming sorted runs.
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
std::string tempDir
Directory in which temporary files are stored.
void swap(array &other)
Swap two arrays.
void increment_temp_file_usage(stream_offset_type delta)
Increment (possibly by a negative amount) the number of bytes being used by temporary files...
size_type size() const
Return the size of the array.
memory_size_type minimumItemSize
Minimum size of serialized items.
bool push(const T &item)
True if all items up to and including this one fits in buffer.
memory_size_type current_serialized_size()
Get the serialized size of the items written.
logstream & log_error()
Return logstream for writing error log messages.
void set_available_files(memory_size_type f)
Calculate parameters from given amount of files.
logstream & log_warning()
Return logstream for writing warning log messages.
memory_size_type memory_usage()
Compute current memory usage.