20 #ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H
21 #define TPIE_PIPELINING_SERIALIZATION_SORT_H
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/factory_base.h>
26 #include <tpie/serialization_sort.h>
31 namespace pipelining {
33 namespace serialization_bits {
35 template <
typename T,
typename pred_t>
39 typedef pred_t pred_type;
41 typedef boost::shared_ptr<sorter_t> sorterptr;
44 template <
typename Traits>
47 template <
typename Traits>
50 template <
typename Traits>
52 typedef typename Traits::pred_type pred_type;
65 void set_calc_node(
node & calc) {
70 sort_output_base(pred_type pred)
81 template <
typename Traits>
84 typedef typename Traits::item_type
item_type;
85 typedef typename Traits::pred_type pred_type;
86 typedef typename Traits::sorter_t
sorter_t;
87 typedef typename Traits::sorterptr
sorterptr;
93 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
98 this->
set_steps(this->m_sorter->item_count());
99 this->
forward(
"items", static_cast<stream_size_type>(this->m_sorter->item_count()));
102 inline bool can_pull()
const {
103 return this->m_sorter->can_pull();
106 inline item_type pull() {
108 return this->m_sorter->pull();
116 virtual void go()
override {
117 log_warning() <<
"Passive sorter used without an initiator in the final merge and output phase.\n"
118 <<
"Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl;
125 this->m_sorter->set_phase_3_memory(availableMemory);
132 template <
typename Traits,
typename dest_t>
134 typedef typename Traits::pred_type pred_type;
136 typedef typename Traits::item_type
item_type;
138 typedef typename Traits::sorter_t
sorter_t;
139 typedef typename Traits::sorterptr
sorterptr;
147 this->
set_name(
"Write sorted output", PRIORITY_INSIGNIFICANT);
152 this->
set_steps(this->m_sorter->item_count());
153 this->
forward(
"items", static_cast<stream_size_type>(this->m_sorter->item_count()));
156 virtual void go()
override {
157 while (this->m_sorter->can_pull()) {
158 dest.push(this->m_sorter->pull());
166 this->m_sorter->set_phase_3_memory(availableMemory);
178 template <
typename Traits>
179 class sort_calc_t :
public node {
181 typedef typename Traits::item_type item_type;
182 typedef typename Traits::sorter_t sorter_t;
183 typedef typename Traits::sorterptr sorterptr;
185 typedef sort_output_base<Traits> Output;
187 sort_calc_t(
const sort_calc_t & other)
189 , m_sorter(other.m_sorter)
194 template <
typename dest_t>
195 sort_calc_t(dest_t dest)
196 : dest(new dest_t(dest))
198 m_sorter = this->dest->get_sorter();
199 this->dest->set_calc_node(*
this);
203 sort_calc_t(sorterptr sorter)
211 set_name(
"Perform merge heap", PRIORITY_SIGNIFICANT);
219 virtual void go()
override {
221 log_debug() <<
"TODO: Progress information during merging." << std::endl;
222 m_sorter->merge_runs();
233 m_sorter->evacuate();
236 sorterptr get_sorter()
const {
240 void set_input_node(
node & input) {
247 m_sorter->set_phase_2_memory(availableMemory);
252 boost::shared_ptr<Output> dest;
260 template <
typename Traits>
261 class sort_input_t :
public node {
262 typedef typename Traits::pred_type pred_type;
264 typedef typename Traits::item_type item_type;
265 typedef typename Traits::sorter_t sorter_t;
266 typedef typename Traits::sorterptr sorterptr;
268 sort_input_t(sort_calc_t<Traits> dest)
269 : m_sorter(dest.get_sorter())
272 this->dest.set_input_node(*
this);
274 set_name(
"Form input runs", PRIORITY_SIGNIFICANT);
283 void push(
const item_type & item) {
284 m_sorter->push(item);
287 virtual void end()
override {
297 m_sorter->evacuate();
303 m_sorter->set_phase_1_memory(availableMemory);
311 template <
typename child_t>
313 const child_t &
self()
const {
return *
static_cast<const child_t *
>(
this); }
315 template <
typename dest_t>
319 typedef typename dest_t::item_type item_type;
321 typedef typename child_t::template predicate<item_type>::type pred_type;
326 template <
typename dest_t>
328 typedef typename dest_t::item_type item_type;
329 typedef typename constructed<dest_t>::pred_type pred_type;
333 this->init_sub_node(output);
335 this->init_sub_node(calc);
337 this->init_sub_node(input);
348 template <
typename item_type>
351 typedef std::less<item_type> type;
354 template <
typename T>
355 std::less<T> get_pred()
const {
356 return std::less<T>();
363 template <
typename pred_t>
366 template <
typename Dummy>
377 template <
typename T>
378 pred_t get_pred()
const {
391 inline pipe_middle<serialization_bits::default_pred_sort_factory>
400 template <
typename pred_t>
401 pipe_middle<serialization_bits::sort_factory<pred_t> >
407 template <
typename T,
typename pred_t=std::less<T> >
410 namespace serialization_bits {
415 template <
typename Traits>
422 typedef typename Traits::sorter_t sorter_t;
423 typedef typename Traits::sorterptr sorterptr;
431 calc_t calc(output->get_sorter());
432 output->set_calc_node(calc);
433 this->init_node(calc);
435 this->init_node(input);
446 template <
typename Traits>
473 template <
typename T,
typename pred_t>
517 namespace serialization_bits {
519 template <
typename Traits>
520 typename passive_sorter_factory_2<Traits>::constructed_type
521 passive_sorter_factory_2<Traits>::construct()
const {
522 constructed_type res = m_sorter.m_output;
533 #endif // TPIE_PIPELINING_SERIALIZATION_SORT_H
virtual void begin()
Begin pipeline processing phase.
The base class for indicating the progress of some task.
Factory for the passive sorter output node.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
Pipe sorter push output node.
virtual void done()
Advance the indicator to the end.
Factory for the passive sorter input node.
Pipe sorter pull output node.
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
T item_type
Type of items sorted.
serialization_bits::sort_pull_output_t< Traits > output_t
Type of pipe sorter output.
virtual void propagate() override
Propagate stream metadata.
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
progress_indicator_base * proxy_progress_indicator()
Get a non-initialized progress indicator for use with external implementations.
Pipelined sorter with push input and pull output.
Binary serialization and unserialization.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
virtual void propagate() override
Propagate stream metadata.
virtual void evacuate() override
Overridden by nodes that have data to evacuate.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
virtual bool can_evacuate() override
Overridden by nodes that have data to evacuate.
Traits::sorterptr sorterptr
Smart pointer to sorter_t.
logstream & log_debug()
Return logstream for writing debug log messages.
Traits::sorter_t sorter_t
Type of the merge sort implementation used.
void add_dependency(const node_token &dest)
Called by implementers to declare a node dependency, that is, a requirement that another node has end...
virtual void end()
End pipeline processing phase.
void step(stream_size_type steps=1)
Step the progress indicator.
pipe_middle< serialization_bits::default_pred_sort_factory > serialization_pipesort()
Pipelining sorter using std::less.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
virtual void set_available_memory(memory_size_type availableMemory) override
Called by the memory manager to set the amount of memory assigned to this node.
void set_memory_fraction(double f)
Set the memory priority of this node.
virtual void go() override
For initiator nodes, execute this phase by pushing all items to be pushed.
A pipe_middle class pushes input down the pipeline.
virtual void propagate() override
Propagate stream metadata.
pipe_end< serialization_bits::passive_sorter_factory< Traits > > input()
Get the input push node.
Sort factory using std::less as comparator.
pullpipe_begin< serialization_bits::passive_sorter_factory_2< Traits > > output()
Get the output pull node.
void set_steps(stream_size_type steps)
Called by implementers that intend to call step().
void forward(std::string key, T value, bool explicitForward=true)
Called by implementers to forward auxiliary data to successors.
logstream & log_warning()
Return logstream for writing warning log messages.
Sort factory using the given predicate as comparator.
Traits::item_type item_type
Type of items sorted.
virtual void init(stream_size_type range=0)
Initialize progress indicator.