20 #ifndef _TPIE_AMI_MERGE_H
21 #define _TPIE_AMI_MERGE_H
46 MERGE_OUTPUT_OVERWRITE = 1,
79 template<
class T,
class M>
130 template<
class T,
class M>
138 template<
class T,
class M>
148 template<
class T,
class M>
234 CONST T * CONST * in,
236 int &taken_index) = 0;
237 virtual err operate(CONST T * CONST *in,
241 virtual err main_mem_operate(T* mm_stream, TPIE_OS_SIZE_T len) = 0;
242 virtual TPIE_OS_SIZE_T space_usage_overhead(
void) = 0;
243 virtual TPIE_OS_SIZE_T space_usage_per_stream(
void) = 0;
244 #endif // VIRTUAL_BASE
251 template<
class T,
class M>
256 TPIE_OS_SIZE_T sz_avail;
257 TPIE_OS_OFFSET sz_stream, sz_needed = 0;
269 for (
unsigned int ii = 0; ii < arity + 1; ii++) {
271 sz_needed += sz_stream;
273 sz_needed -= sz_stream;
278 sz_needed += m_obj->space_usage_overhead() +
279 arity * m_obj->space_usage_per_stream();
282 if (sz_needed >= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
283 TP_LOG_WARNING(
"Insufficient main memory to perform a merge.\n");
286 assert(sz_needed < sz_avail);
289 return single_merge(instreams, arity, outstream, m_obj);
296 template<
class T,
class M>
305 T* *in_objects = tpie_new_array<T*>(arity);
309 merge_flag* taken_flags = tpie_new_array<merge_flag>(arity);
318 #if DEBUG_PERFECT_MERGE
319 unsigned int input_count = 0, output_count = 0;
324 for (ii = arity; ii--; ) {
325 if ((ami_err = instreams[ii]->seek(0)) !=
NO_ERROR) {
330 if ((ami_err = instreams[ii]->read_item(&(in_objects[ii]))) !=
334 in_objects[ii] = NULL;
344 #if DEBUG_PERFECT_MERGE
351 if (((ami_err = m_obj->initialize(arity, in_objects, taken_flags,
362 for (ii = arity; ii--; ) {
363 if (taken_flags[ii]) {
364 ami_err = instreams[ii]->
read_item(&(in_objects[ii]));
367 in_objects[ii] = NULL;
374 #if DEBUG_PERFECT_MERGE
384 if (taken_index >= 0) {
385 ami_err = instreams[taken_index]->
386 read_item(&(in_objects[taken_index]));
389 in_objects[taken_index] = NULL;
396 #if DEBUG_PERFECT_MERGE
400 taken_flags[taken_index] = 0;
403 ami_err = m_obj->operate(in_objects, taken_flags, taken_index,
408 #if DEBUG_PERFECT_MERGE
411 if ((ami_err = outstream->
write_item(merge_out)) !=
425 #if DEBUG_PERFECT_MERGE
427 "Merge done, input_count = " << input_count <<
428 ", output_count = " << output_count <<
'.');
441 template<
class T,
class M>
447 TPIE_OS_SIZE_T sz_avail;
453 if ((len * static_cast<TPIE_OS_OFFSET>(
sizeof(T))) <= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
458 ae = instream->
seek(0);
467 mm_stream = tpie_new_array<T>(
static_cast<TPIE_OS_SIZE_T
>(len));
470 if ((ae = instream->
read_array(mm_stream, &len1)) !=
474 tp_assert(len1 == len,
"Did not read the right amount; "
475 "Allocated space for " << len <<
", read " << len1 <<
'.');
479 if ((ae = m_obj->main_mem_operate(mm_stream,
480 static_cast<TPIE_OS_SIZE_T>(len))) !=
482 TP_LOG_WARNING_ID(
"main_mem_operate failed");
488 static_cast<TPIE_OS_SIZE_T>(len))) !=
490 TP_LOG_WARNING_ID(
"write array failed");
501 TP_LOG_WARNING_ID(
"out of memory");
507 template<
class T,
class M>
513 TPIE_OS_SIZE_T sz_avail, sz_stream;
524 if ((len * static_cast<TPIE_OS_OFFSET>(
sizeof(T))) <= static_cast<TPIE_OS_OFFSET>(sz_avail)) {
540 TPIE_OS_OFFSET sz_orig_substr;
562 TPIE_OS_OFFSET current_substream_len;
568 TPIE_OS_OFFSET sub_start, sub_end;
579 if (sz_avail <= 2 * sz_stream +
sizeof(T)) {
582 sz_avail -= 2 * sz_stream;
586 sz_orig_substr = sz_avail /
sizeof(T);
593 TPIE_OS_OFFSET sz_chunk_size = instream->
chunk_size();
595 sz_orig_substr = sz_chunk_size *
596 ((sz_orig_substr + sz_chunk_size - 1) /sz_chunk_size);
601 nb_orig_substr =
static_cast<arity_t>((len + sz_orig_substr - 1) /
606 TPIE_OS_SIZE_T sz_avail_during_merge = sz_avail - m_obj->space_usage_overhead();
607 TPIE_OS_SIZE_T sz_stream_during_merge = sz_stream +m_obj->space_usage_per_stream();
609 merge_arity =
static_cast<arity_t>((sz_avail_during_merge +
610 sz_stream_during_merge - 1) /
611 sz_stream_during_merge);
621 if (ami_available_streams != -1) {
622 if (ami_available_streams <= 4) {
626 if (merge_arity > static_cast<arity_t>(ami_available_streams) - 2) {
627 merge_arity = ami_available_streams - 2;
628 TP_LOG_DEBUG_ID(
"Reduced merge arity due to AMI restrictions.");
632 TP_LOG_DEBUG_ID(
"partition_and_merge(): merge arity = "
633 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(merge_arity));
634 if (merge_arity < 2) {
640 #ifdef MINIMIZE_INITIAL_SUBSTREAM_LENGTH
648 double tree_height = log((
double)nb_orig_substr)/ log((
double)merge_arity);
649 tp_assert(tree_height > 0,
"Negative or zero tree height!");
651 tree_height = ceil(tree_height);
655 double max_original_substreams = pow((
double)merge_arity, tree_height);
656 tp_assert(max_original_substreams >= nb_orig_substr,
657 "Number of permitted substreams was reduced.");
660 double new_sz_original_substream = ceil((
double)len /
661 max_original_substreams);
662 tp_assert(new_sz_original_substream <= sz_orig_substr,
663 "Size of original streams increased.");
665 sz_orig_substr = (size_t)new_sz_original_substream;
666 TP_LOG_DEBUG_ID(
"Memory constraints set original substreams = " << nb_orig_substr);
668 nb_orig_substr = (len + sz_orig_substr - 1) / sz_orig_substr;
669 TP_LOG_DEBUG_ID(
"Tree height constraints set original substreams = " << nb_orig_substr);
671 #endif // MINIMIZE_INITIAL_SUBSTREAM_LENGTH
677 initial_tmp_stream = tpie_new<stream<T> >();
678 mm_stream = tpie_new_array<T>(
static_cast<TPIE_OS_SIZE_T
>(sz_orig_substr));
683 tp_assert(static_cast<TPIE_OS_OFFSET>(nb_orig_substr * sz_orig_substr - len) < sz_orig_substr,
684 "Total substream length too long or too many.");
685 tp_assert(len - static_cast<TPIE_OS_OFFSET>(nb_orig_substr - 1) * sz_orig_substr <= sz_orig_substr,
686 "Total substream length too short or too few.");
688 for (ii = 0; ii++ < nb_orig_substr; ) {
690 TPIE_OS_OFFSET mm_len;
691 if (ii == nb_orig_substr) {
692 mm_len = len % sz_orig_substr;
696 mm_len = sz_orig_substr;
699 mm_len = sz_orig_substr;
702 TPIE_OS_OFFSET mm_len_bak = mm_len;
706 ae = instream->
read_array(mm_stream, &mm_len);
711 "Did not read the requested number of objects." <<
712 "\n\tmm_len = " << mm_len <<
713 "\n\tmm_len_bak = " << mm_len_bak <<
'.');
716 m_obj->main_mem_operate(mm_stream, static_cast<TPIE_OS_SIZE_T>(mm_len));
719 ae = initial_tmp_stream->
write_array(mm_stream, static_cast<TPIE_OS_SIZE_T>(mm_len));
729 "Stream lengths do not match:" <<
730 "\n\tinstream->stream_len() = " << instream->
stream_len() <<
731 "\n\tinitial_tmp_stream->stream_len() = " <<
736 current_input = initial_tmp_stream;
737 current_substream_len = sz_orig_substr;
740 stream<T>* *the_substreams = tpie_new_array<stream<T>* >(merge_arity);
743 TP_LOG_DEBUG_ID(
"Number of runs from run formation is "
744 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(nb_orig_substr));
745 TP_LOG_DEBUG_ID(
"Merge arity is "
746 << static_cast<TPIE_OS_OUTPUT_SIZE_T>(merge_arity));
753 for( ; current_substream_len < len;
754 current_substream_len *= merge_arity) {
760 tp_assert(len == current_input->stream_len(),
761 "Current level stream not same length as input." <<
762 "\n\tlen = " << len <<
763 "\n\tcurrent_input->stream_len() = " <<
764 current_input->stream_len() <<
".\n");
770 substream_count =
static_cast<arity_t>((len + current_substream_len - 1) /
771 current_substream_len);
773 if (substream_count <= merge_arity) {
775 TP_LOG_DEBUG_ID(
"Merging substreams directly to the output stream.");
778 for (sub_start = 0, ii = 0 ;
779 ii < substream_count;
780 sub_start += current_substream_len, ii++) {
782 sub_end = sub_start + current_substream_len - 1;
783 if (sub_end >= len) {
786 current_input->new_substream(READ_STREAM, sub_start, sub_end, the_substreams + ii);
788 the_substreams[ii]->
persist(PERSIST_READ_ONCE);
792 (sub_start < len + current_substream_len),
793 "Loop ended in wrong location.");
800 current_input->seek(0);
804 ae =
single_merge(the_substreams, substream_count, outstream, m_obj);
809 for (ii = 0; ii < substream_count; ii++) {
818 TP_LOG_DEBUG_ID(
"Merging substreams to an intermediate stream.");
828 current_input->
seek(0);
833 for (sub_start = 0, ii = 0, jj = 0;
834 ii < substream_count;
835 sub_start += current_substream_len, ii++, jj++) {
837 sub_end = sub_start + current_substream_len - 1;
838 if (sub_end >= len) {
841 current_input->new_substream(READ_STREAM, sub_start, sub_end, the_substreams + jj);
843 the_substreams[jj]->
persist(PERSIST_READ_ONCE);
847 if ((jj >= static_cast<int>(merge_arity) - 1) ||
848 (ii == substream_count - 1)) {
850 tp_assert(jj <= static_cast<int>(merge_arity) - 1,
851 "Index got too large.");
854 TPIE_OS_OFFSET sz_output, sz_output_after_merge;
855 TPIE_OS_OFFSET sz_substream_total;
860 sz_output = intermediate_tmp_stream->
stream_len();
861 sz_substream_total = 0;
863 for (kk = jj+1; kk--; ) {
864 sz_substream_total += the_substreams[kk]->
stream_len();
874 intermediate_tmp_stream, m_obj);
881 sz_output_after_merge = intermediate_tmp_stream->
stream_len();
882 tp_assert(sz_output_after_merge - sz_output ==
884 "Stream lengths do not add up: " <<
885 sz_output_after_merge - sz_output <<
887 sz_substream_total <<
888 " were to have been read.");
900 tp_assert((jj == -1),
"Index not reduced to -1.");
907 current_input = intermediate_tmp_stream;
915 TP_LOG_DEBUG_ID(
"Number of passes incl run formation is " << k+1);
Defines the tp_assert macro.
err read_array(T *mm_space, stream_offset_type *len)
Reads *len items from the current position of the stream into the array mm_array. ...
err write_item(const T &elt)
Writes elt to the stream in the current position.
size_t consecutive_memory_available(size_t granularity=5 *1024 *1024)
Find the largest amount of memory that can be allocated as a single chunk.
An TPIE entry point was not able to properly initialize the operation management object that was pass...
Value returned by a merge_management_object, telling merge() to continue to call the operate() member...
Memory management subsystem.
Max amount that will ever be used.
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
int merge_flag
Intended to signal in a merge which of the input streams are non-empty.
err partition_and_merge(stream< T > *instream, stream< T > *outstream, M *m_obj)
Partitions a stream into substreams small enough to fit.
This file contains a few deprecated definitions for legacy code.
err single_merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams in memory using a merge management object and write result into outstream...
err read_item(T **elt)
Reads the current item from the stream and advance the "current item" pointer to the next item...
void persist(persistence p)
Set the stream's persistence flag to p, which can have one of two values: PERSIST_DELETE or PERSIST_P...
Value returned by a merge_management_object, signaling that the merge() completed.
err seek(stream_offset_type offset)
Move the current position to off (measured in terms of items.
stream_offset_type stream_len(void) const
Returns the number of items in the stream.
TPIE_OS_SIZE_T arity_t
Intended to signal the number of input streams in a merge.
size_t available_streams(void)
Returns the number of globally available streams.
err main_mem_merge(stream< T > *instream, stream< T > *outstream, M *m_obj)
Reads instream in memory and merges it using m_obj->main_mem_operate(); if instream does not fit in m...
err main_memory_usage(size_type *usage, stream_usage usage_type) const
This function is used for obtaining the amount of main memory used by an Stream object (in bytes)...
A Stream object stores an ordered collection of objects of type T on external memory.
Superclass for merge management objects.
err merge(stream< T > **instreams, arity_t arity, stream< T > *outstream, M *m_obj)
Merges arity streams using a merge management object and writes result into outstream.
TPIE could not allocate enough intermediate streams to perform the requested operation.
err
Legacy TPIE error codes.
void tpie_delete(T *p)
Delete an object allocated with tpie_new.
Value returned by a merge_management_object, telling merge() that more than one input ob ject was con...
#define tp_assert(condition, message)
The memory manager could not make adequate main memory available to complete the requested operation...
memory_size_type chunk_size(void) const
Returns the maximum number of items (of type T) that can be stored in one block.
An attempt was made to read past the end of a stream or write past the end of a substream.
Value returned by a merge_management_object, signaling that the last merge() call generated output fo...
void tpie_delete_array(T *a, size_t size)
Delete an array allocated with tpie_new_array.
err write_array(const T *mm_space, memory_size_type len)
Writes len items from array |mm_array to the stream, starting in the current position.