20 #ifndef __TPIE_PIPELINING_SORT_H__
21 #define __TPIE_PIPELINING_SORT_H__
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/pipelining/merge_sorter.h>
32 #include <boost/shared_ptr.hpp>
36 namespace pipelining {
40 template <
typename T,
typename pred_t>
43 template <
typename T,
typename pred_t>
46 template <
typename T,
typename pred_t>
61 void set_calc_node(
node & calc) {
66 sort_output_base(pred_t pred)
79 template <
typename T,
typename pred_t>
94 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
99 this->
set_steps(this->m_sorter->item_count());
100 this->
forward(
"items", static_cast<stream_size_type>(this->m_sorter->item_count()));
103 inline bool can_pull()
const {
104 return this->m_sorter->can_pull();
109 return this->m_sorter->pull();
117 virtual void go()
override {
118 log_warning() <<
"Passive sorter used without an initiator in the final merge and output phase.\n"
119 <<
"Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
126 this->m_sorter->set_phase_3_memory(availableMemory);
135 template <
typename pred_t,
typename dest_t>
154 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
159 this->
set_steps(this->m_sorter->item_count());
160 this->
forward(
"items", static_cast<stream_size_type>(this->m_sorter->item_count()));
163 virtual void go()
override {
164 while (this->m_sorter->can_pull()) {
165 dest.push(this->m_sorter->pull());
173 this->m_sorter->set_phase_3_memory(availableMemory);
185 template <
typename T,
typename pred_t>
186 class sort_calc_t :
public node {
199 , m_sorter(other.m_sorter)
204 template <
typename dest_t>
206 : dest(new dest_t(dest))
208 m_sorter = this->dest->get_sorter();
209 this->dest->set_calc_node(*
this);
221 set_name(
"Perform merge heap", PRIORITY_SIGNIFICANT);
229 virtual void go()
override {
239 m_sorter->evacuate_before_reporting();
246 void set_input_node(
node & input) {
253 m_sorter->set_phase_2_memory(availableMemory);
258 boost::shared_ptr<Output> dest;
266 template <
typename T,
typename pred_t>
267 class sort_input_t :
public node {
277 : m_sorter(dest.get_sorter())
280 this->dest.set_input_node(*
this);
282 set_name(
"Form input runs", PRIORITY_SIGNIFICANT);
288 m_sorter->set_items(this->fetch<stream_size_type>(
"items"));
292 inline void push(
const item_type & item) {
293 m_sorter->push(item);
296 virtual void end()
override {
306 m_sorter->evacuate_before_merging();
312 m_sorter->set_phase_1_memory(availableMemory);
320 template <
typename child_t>
322 const child_t &
self()
const {
return *
static_cast<const child_t *
>(
this); }
324 template <
typename dest_t>
328 typedef typename dest_t::item_type item_type;
330 typedef typename child_t::template predicate<item_type>::type pred_type;
334 template <
typename dest_t>
336 typedef typename dest_t::item_type item_type;
337 typedef typename constructed<dest_t>::pred_type pred_type;
340 this->init_sub_node(output);
342 this->init_sub_node(calc);
344 this->init_sub_node(input);
355 template <
typename item_type>
358 typedef std::less<item_type> type;
361 template <
typename T>
362 std::less<T> get_pred()
const {
363 return std::less<T>();
370 template <
typename pred_t>
373 template <
typename Dummy>
384 template <
typename T>
385 pred_t get_pred()
const {
398 inline pipe_middle<bits::default_pred_sort_factory>
407 template <
typename pred_t>
408 inline pipe_middle<bits::sort_factory<pred_t> >
414 template <
typename T,
typename pred_t=std::less<T> >
422 template <
typename T,
typename pred_t>
430 typedef typename sorter_t::ptr sorterptr;
438 calc_t calc(output->get_sorter());
439 output->set_calc_node(calc);
440 this->init_node(calc);
442 this->init_node(input);
453 template <
typename T,
typename pred_t>
480 template <
typename T,
typename pred_t>
525 template <
typename T,
typename pred_t>
526 typename passive_sorter_factory_2<T, pred_t>::constructed_type
527 passive_sorter_factory_2<T, pred_t>::construct()
const {
528 constructed_type res = m_sorter.m_output;
virtual void propagate() override
Propagate stream metadata.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Memory management subsystem.
T item_type
Type of items sorted.
T item_type
Type of items sorted.
Sort factory using the given predicate as comparator.
The base class for indicating the progress of some task.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
pullpipe_begin< bits::passive_sorter_factory_2< item_type, pred_t > > output()
Get the output pull node.
T item_type
Type of items sorted.
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
Factory for the passive sorter output node.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Merge sorting consists of three phases.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
pipe_middle< bits::default_pred_sort_factory > pipesort()
Pipelining sorter using std::less.
T item_type
Type of items sorted.
Pipelined sorter with push input and pull output.
virtual void propagate() override
Propagate stream metadata.
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
Simple parallel quick sort implementation with progress tracking.
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Sort factory using std::less as comparator.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
Pipe sorter pull output node.
Pipe sorter push output node.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
virtual void end()
End pipeline processing phase.
pipe_end< bits::passive_sorter_factory< item_type, pred_t > > input()
Get the input push node.
dest_t::item_type item_type
Type of items sorted.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
void step(stream_size_type steps=1)
Step the progress indicator.
void set_memory_fraction(double f)
Set the memory priority of this node.
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
A pipe_middle class pushes input down the pipeline.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
bits::sort_pull_output_t< item_type, pred_t > output_t
Type of pipe sorter output.
sort_output_base< item_type, pred_t > p_t
Base class.
virtual void propagate() override
Propagate stream metadata.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
merge_sorter< item_type, true, pred_t > sorter_t
Type of the merge sort implementation used.
sorter_t::ptr sorterptr
Smart pointer to sorter_t.
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
Factory for the passive sorter input node.
logstream & log_warning()
Return logstream for writing warning log messages.
Simple class acting both as a tpie::file and a tpie::file::stream.