20 #ifndef TPIE_SERIALIZATION_SORT_H
21 #define TPIE_SERIALIZATION_SORT_H
24 #include <boost/filesystem.hpp>
38 namespace serialization_bits {
52 void dump(std::ostream & out)
const {
53 out <<
"Serialization merge sort parameters\n"
58 <<
"Temporary directory: " <<
tempDir <<
'\n';
62 template <
typename T,
typename pred_t>
65 memory_size_type m_items;
66 memory_size_type m_serializedSize;
67 memory_size_type m_memAvail;
69 memory_size_type m_largestItem;
79 , m_largestItem(
sizeof(T))
85 void begin(memory_size_type memAvail) {
86 m_buffer.
resize(memAvail /
sizeof(T) / 2);
87 m_items = m_serializedSize = 0;
88 m_largestItem =
sizeof(T);
90 m_memAvail = memAvail;
99 bool push(
const T & item) {
100 if (m_full)
return false;
102 if (m_items == m_buffer.
size()) {
109 if (serSize >
sizeof(T)) {
111 memory_size_type serializedExtra = serSize -
sizeof(T);
114 memory_size_type memRemainingExtra = m_memAvail -
memory_usage();
116 if (serializedExtra > memRemainingExtra) {
121 if (serSize > m_largestItem)
122 m_largestItem = serSize;
125 m_serializedSize += serSize;
127 m_buffer[m_items++] = item;
132 memory_size_type get_largest_item_size() {
133 return m_largestItem;
143 return m_serializedSize;
157 return m_buffer.
size() *
sizeof(T)
158 + (m_serializedSize - m_items *
sizeof(T));
161 bool can_shrink_buffer() {
165 void shrink_buffer() {
167 m_buffer.
swap(newBuffer);
174 const T * begin()
const {
175 return m_buffer.
get();
178 const T * end()
const {
179 return m_buffer.
get() + m_items;
195 m_items = m_serializedSize = 0;
239 template <
typename T>
244 size_t m_nextLevelFileOffset;
246 size_t m_nextFileOffset;
249 size_t m_readersOpen;
252 stream_size_type m_currentWriterByteSize;
256 std::string m_tempDir;
258 std::string run_file(
size_t physicalIndex) {
259 if (m_tempDir.size() == 0)
throw exception(
"run_file: temp dir is the empty string");
260 std::stringstream ss;
261 ss << m_tempDir <<
'/' << physicalIndex <<
".tpie";
268 , m_nextLevelFileOffset(0)
269 , m_nextFileOffset(0)
271 , m_writerOpen(
false)
275 , m_currentWriterByteSize(0)
283 void set_temp_dir(
const std::string & tempDir) {
284 if (m_nextFileOffset != 0)
285 throw exception(
"set_temp_dir: trying to change path after files already open");
289 void open_new_writer() {
290 if (m_writerOpen)
throw exception(
"open_new_writer: Writer already open");
291 m_writer.open(run_file(m_nextFileOffset++));
292 m_currentWriterByteSize = m_writer.file_size();
296 void write(
const T & v) {
297 if (!m_writerOpen)
throw exception(
"write: No writer open");
301 void close_writer() {
302 if (!m_writerOpen)
throw exception(
"close_writer: No writer open");
304 stream_size_type sz = m_writer.file_size();
305 increase_usage(m_nextFileOffset-1, static_cast<stream_offset_type>(sz));
306 m_writerOpen =
false;
309 size_t remaining_runs() {
310 return m_nextLevelFileOffset - m_fileOffset;
313 size_t next_level_runs() {
314 return m_nextFileOffset - m_nextLevelFileOffset;
317 bool readers_open() {
318 return m_readersOpen > 0;
321 void open_readers(
size_t fanout) {
322 if (m_readersOpen != 0)
throw exception(
"open_readers: readers already open");
323 if (fanout == 0)
throw exception(
"open_readers: fanout == 0");
324 if (remaining_runs() == 0) {
325 if (m_writerOpen)
throw exception(
"Writer open while moving to next merge level");
326 m_nextLevelFileOffset = m_nextFileOffset;
328 if (fanout > remaining_runs())
throw exception(
"open_readers: fanout out of bounds");
330 if (m_readers.size() < fanout) m_readers.resize(fanout);
331 for (
size_t i = 0; i < fanout; ++i) {
332 m_readers[i].open(run_file(m_fileOffset + i));
334 m_readersOpen = fanout;
337 bool can_read(
size_t idx) {
338 if (m_readersOpen == 0)
throw exception(
"can_read: no readers open");
339 if (m_readersOpen < idx)
throw exception(
"can_read: index out of bounds");
340 return m_readers[idx].can_read();
344 if (m_readersOpen == 0)
throw exception(
"read: no readers open");
345 if (m_readersOpen < idx)
throw exception(
"read: index out of bounds");
347 m_readers[idx].unserialize(res);
351 void close_readers_and_delete() {
352 if (m_readersOpen == 0)
throw exception(
"close_readers_and_delete: no readers open");
354 for (
size_t i = 0; i < m_readersOpen; ++i) {
355 decrease_usage(m_fileOffset + i, m_readers[i].file_size());
356 m_readers[i].close();
357 boost::filesystem::remove(run_file(m_fileOffset + i));
359 m_fileOffset += m_readersOpen;
363 void move_last_reader_to_next_level() {
364 if (remaining_runs() != 1)
365 throw exception(
"move_last_reader_to_next_level: remaining_runs != 1");
366 m_nextLevelFileOffset = m_fileOffset;
370 if (m_readersOpen > 0) {
371 log_debug() <<
"reset: Close readers" << std::endl;
372 close_readers_and_delete();
376 log_debug() <<
"reset: Close writer" << std::endl;
379 log_debug() <<
"Remove " << m_fileOffset <<
" through " << m_nextFileOffset << std::endl;
380 for (
size_t i = m_fileOffset; i < m_nextFileOffset; ++i) {
381 std::string runFile = run_file(i);
386 boost::filesystem::remove(runFile);
388 m_fileOffset = m_nextLevelFileOffset = m_nextFileOffset = 0;
392 void increase_usage(
size_t idx, stream_size_type sz) {
393 log_debug() <<
"+ " << idx <<
' ' << sz << std::endl;
397 void decrease_usage(
size_t idx, stream_size_type sz) {
398 log_debug() <<
"- " << idx <<
' ' << sz << std::endl;
403 template <
typename T,
typename pred_t>
409 typedef std::pair<T, size_t> item_type;
411 mergepred_t(
const pred_t & pred) : m_pred(pred) {}
414 bool operator()(
const item_type & a,
const item_type & b)
const {
415 return m_pred(b.first, a.first);
419 typedef typename mergepred_t::item_type item_type;
423 std::vector<serialization_reader> rd;
424 typedef std::priority_queue<item_type, std::vector<item_type>, mergepred_t> priority_queue_type;
425 priority_queue_type pq;
431 , pq(mergepred_t(pred))
436 void init(
size_t fanout) {
438 for (
size_t i = 0; i < fanout; ++i)
446 const T & top()
const {
447 return pq.top().first;
451 size_t idx = pq.top().second;
459 priority_queue_type tmp(pred);
466 void push_from(
size_t idx) {
467 if (files.can_read(idx)) {
468 pq.push(std::make_pair(files.read(idx), idx));
475 template <
typename T,
typename pred_t = std::less<T> >
478 typedef boost::shared_ptr<serialization_sort> ptr;
481 enum sorter_state { state_initial, state_1, state_2, state_3 };
483 sorter_state m_state;
486 bool m_parametersSet;
490 stream_size_type m_items;
491 bool m_reportInternal;
492 const T * m_nextInternalItem;
495 serialization_sort(memory_size_type minimumItemSize =
sizeof(T), pred_t pred = pred_t())
496 : m_state(state_initial)
498 , m_parametersSet(
false)
500 , m_merger(m_files, pred)
502 , m_reportInternal(
false)
503 , m_nextInternalItem(0)
513 inline void maybe_calculate_parameters() {
514 if (m_state != state_initial)
520 calculate_parameters();
524 void set_phase_1_memory(memory_size_type m1) {
526 maybe_calculate_parameters();
529 void set_phase_2_memory(memory_size_type m2) {
531 maybe_calculate_parameters();
534 void set_phase_3_memory(memory_size_type m3) {
536 maybe_calculate_parameters();
539 void set_available_memory(memory_size_type m) {
540 set_phase_1_memory(m);
541 set_phase_2_memory(m);
542 set_phase_3_memory(m);
545 void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) {
546 set_phase_1_memory(m1);
547 set_phase_2_memory(m2);
548 set_phase_3_memory(m3);
551 static memory_size_type minimum_memory_phase_1() {
552 return serialization_writer::memory_usage()*2;
555 static memory_size_type minimum_memory_phase_2() {
556 return serialization_writer::memory_usage()
557 + 2*serialization_reader::memory_usage();
560 static memory_size_type minimum_memory_phase_3() {
561 return 2*serialization_reader::memory_usage();
565 void calculate_parameters() {
566 if (m_state != state_initial)
570 if (memAvail1 <= serialization_writer::memory_usage()) {
571 log_error() <<
"Not enough memory for run formation; have " << memAvail1
572 <<
" bytes but " << serialization_writer::memory_usage()
573 <<
" is required for writing a run." << std::endl;
574 throw exception(
"Not enough memory for run formation");
580 if (memAvail2 <= serialization_writer::memory_usage()) {
581 log_error() <<
"Not enough memory for merging. "
582 <<
"mem avail = " << memAvail2
583 <<
", writer usage = " << serialization_writer::memory_usage()
585 throw exception(
"Not enough memory for merging.");
591 if (memAvail2 <= serialization_writer::memory_usage()) {
592 log_error() <<
"Not enough memory for outputting. "
593 <<
"mem avail = " << memAvail3
594 <<
", writer usage = " << serialization_writer::memory_usage()
596 throw exception(
"Not enough memory for outputting.");
599 memory_size_type memForMerge = std::min(memAvail2, memAvail3);
606 memory_size_type fanoutMemory = memForMerge - serialization_writer::memory_usage();
609 memory_size_type perFanout = m_params.
minimumItemSize + serialization_reader::memory_usage();
612 memory_size_type fanout = fanoutMemory / perFanout;
614 log_error() <<
"Not enough memory for merging, even when minimum item size is assumed. "
615 <<
"mem avail = " << memForMerge
616 <<
", fanout memory = " << fanoutMemory
617 <<
", per fanout >= " << perFanout
619 throw exception(
"Not enough memory for merging.");
623 m_files.set_temp_dir(m_params.
tempDir);
625 log_info() <<
"Calculated serialization_sort parameters.\n";
629 m_parametersSet =
true;
634 if (!m_parametersSet)
636 if (m_state != state_initial)
640 log_info() <<
"Before begin; mem usage = "
642 m_sorter.begin(m_params.
memoryPhase1 - serialization_writer::memory_usage());
643 log_info() <<
"After internal sorter begin; mem usage = "
645 boost::filesystem::create_directory(m_params.
tempDir);
648 void push(
const T & item) {
649 if (m_state != state_1)
654 if (m_sorter.push(item))
return;
656 if (!m_sorter.push(item)) {
657 throw exception(
"Couldn't fit a single item in buffer");
662 if (m_state != state_1)
665 memory_size_type internalThreshold =
668 log_debug() <<
"m_sorter.memory_usage == " << m_sorter.memory_usage() <<
'\n'
669 <<
"internalThreshold == " << internalThreshold << std::endl;
672 m_reportInternal =
true;
673 m_nextInternalItem = 0;
675 log_debug() <<
"Got no items. Internal reporting mode." << std::endl;
676 }
else if (m_files.next_level_runs() == 0
677 && m_sorter.memory_usage()
678 <= internalThreshold) {
681 m_reportInternal =
true;
682 m_nextInternalItem = m_sorter.begin();
683 log_debug() <<
"Got " << m_sorter.current_serialized_size()
684 <<
" bytes of items. Internal reporting mode." << std::endl;
685 }
else if (m_files.next_level_runs() == 0
686 && m_sorter.current_serialized_size() <= internalThreshold
687 && m_sorter.can_shrink_buffer()) {
690 m_sorter.shrink_buffer();
691 m_reportInternal =
true;
692 m_nextInternalItem = m_sorter.begin();
693 log_debug() <<
"Got " << m_sorter.current_serialized_size()
694 <<
" bytes of items. Internal reporting mode after shrinking buffer." << std::endl;
699 log_debug() <<
"Got " << m_files.next_level_runs() <<
" runs. "
700 <<
"External reporting mode." << std::endl;
702 m_reportInternal =
false;
705 log_info() <<
"After internal sorter end; mem usage = "
711 stream_size_type item_count() {
723 if (m_reportInternal) {
726 m_reportInternal =
false;
727 log_debug() <<
"Evacuate out of internal reporting mode." << std::endl;
729 log_debug() <<
"Evacuate in external reporting mode - noop." << std::endl;
735 memory_size_type evacuated_memory_usage()
const {
740 if (m_state != state_2)
743 if (m_reportInternal) {
744 log_debug() <<
"merge_runs: internal reporting; doing nothing." << std::endl;
749 memory_size_type largestItem = m_sorter.get_largest_item_size();
750 if (largestItem == 0) {
751 log_warning() <<
"Largest item is 0 bytes; doing nothing." << std::endl;
756 if (m_params.
memoryPhase2 <= serialization_writer::memory_usage())
757 throw exception(
"Not enough memory for merging.");
761 memory_size_type fanoutMemory = m_params.
memoryPhase2 - serialization_writer::memory_usage();
762 memory_size_type perFanout = largestItem + serialization_reader::memory_usage();
763 memory_size_type fanout = fanoutMemory / perFanout;
766 log_error() <<
"Not enough memory for merging. "
768 <<
", fanout memory = " << fanoutMemory
769 <<
", per fanout = " << perFanout
771 throw exception(
"Not enough memory for merging.");
774 memory_size_type finalFanoutMemory = m_params.
memoryPhase3;
775 memory_size_type finalFanout =
777 finalFanoutMemory / perFanout);
779 if (finalFanout < 2) {
780 log_error() <<
"Not enough memory for merging (final fanout < 2). "
782 <<
", final fanout memory = " << finalFanoutMemory
783 <<
", per fanout = " << perFanout
785 throw exception(
"Not enough memory for merging.");
788 log_debug() <<
"Calculated merge phase parameters for serialization sort.\n"
789 <<
"Fanout: " << fanout <<
'\n'
790 <<
"Final fanout: " << finalFanout <<
'\n'
793 while (m_files.next_level_runs() > finalFanout) {
794 if (m_files.remaining_runs() != 0)
795 throw exception(
"m_files.remaining_runs() != 0");
796 log_debug() <<
"Runs in current level: " << m_files.next_level_runs() <<
'\n';
797 for (
size_t remainingRuns = m_files.next_level_runs(); remainingRuns > 0;) {
798 size_t f = std::min(fanout, remainingRuns);
801 if (remainingRuns != m_files.remaining_runs())
802 throw exception(
"remainingRuns != m_files.remaining_runs()");
812 if (m_sorter.begin() == m_sorter.end())
return;
813 m_files.open_new_writer();
814 for (
const T * item = m_sorter.begin(); item != m_sorter.end(); ++item) {
815 m_files.write(*item);
817 m_files.close_writer();
821 void initialize_merger(
size_t fanout) {
822 if (fanout == 0)
throw exception(
"initialize_merger: fanout == 0");
823 m_files.open_readers(fanout);
824 m_merger.init(fanout);
827 void free_merger_and_files() {
829 m_files.close_readers_and_delete();
832 void merge_runs(
size_t fanout) {
833 if (fanout == 0)
throw exception(
"merge_runs: fanout == 0");
835 if (fanout == 1 && m_files.remaining_runs() == 1) {
836 m_files.move_last_reader_to_next_level();
840 initialize_merger(fanout);
841 m_files.open_new_writer();
842 while (!m_merger.empty()) {
843 m_files.write(m_merger.top());
846 free_merger_and_files();
847 m_files.close_writer();
855 if (m_reportInternal) {
856 T item = *m_nextInternalItem++;
857 if (m_nextInternalItem == m_sorter.end()) {
859 m_nextInternalItem = 0;
864 if (!m_files.readers_open()) {
865 if (m_files.next_level_runs() == 0)
866 throw exception(
"pull: next_level_runs == 0");
867 initialize_merger(m_files.next_level_runs());
870 T item = m_merger.top();
873 if (m_merger.empty()) {
874 free_merger_and_files();
882 if (m_reportInternal)
return m_nextInternalItem != 0;
883 if (!m_files.readers_open())
return m_files.next_level_runs() > 0;
884 return !m_merger.empty();
890 #endif // TPIE_SERIALIZATION_SORT_H
memory_size_type memoryPhase2
Memory available while merging runs.
size_t serialized_size(const T &v)
Given a serializable, serialize it and measure its serialized size.
Encapsulation of two pointers from any random access container.
T * get()
Return a raw pointer to the array content.
Binary serialization and unserialization.
void parallel_sort(iterator_type a, iterator_type b, typename tpie::progress_types< Progress >::base &pi, comp_type comp=std::less< typename boost::iterator_value< iterator_type >::type >())
Sort items in the range [a,b) using a parallel quick sort.
memory_size_type memoryPhase1
Memory available while forming sorted runs.
void reset()
Reset sorter, but keep the remembered largest item size and buffer size.
logstream & log_info()
Return logstream for writing info log messages.
size_t available() const
Return the amount of memory still available to allocation.
Stream of serializable items.
memory_size_type memoryPhase3
Memory available during output phase.
static std::string tpie_dir_name(const std::string &post_base="", const std::string &dir="")
Generate path for a new temporary directory.
Logging functionality and log_level codes for different priorities of log messages.
Generic internal array with known memory requirements.
void serialize(const T &v)
Serialize a serializable item and write it to the stream.
void free()
Deallocate buffer and call reset().
Simple parallel quick sort implementation with progress tracking.
stream_size_type file_size()
Size of file in bytes, including the header.
memory_manager & get_memory_manager()
Return a reference to the memory manager.
logstream & log_debug()
Return logstream for writing debug log messages.
File handling for merge sort.
void resize(size_t size, const T &elm)
Change the size of the array.
size_t used() const
Return the current amount of memory used.
std::string tempDir
Directory in which temporary files are stored.
void swap(array &other)
Swap two arrays.
void increment_temp_file_usage(stream_offset_type delta)
Increment (possibly by a negative amount) the number of bytes being used by temporary files...
size_type size() const
Return the size of the array.
memory_size_type minimumItemSize
Minimum size of serialized items.
bool push(const T &item)
True if all items up to and including this one fits in buffer.
memory_size_type current_serialized_size()
Get the serialized size of the items written.
logstream & log_error()
Return logstream for writing error log messages.
logstream & log_warning()
Return logstream for writing warning log messages.
memory_size_type memory_usage()
Compute current memory usage.