20 #ifndef __TPIE_PIPELINING_PARALLEL_BASE_H__
21 #define __TPIE_PIPELINING_PARALLEL_BASE_H__
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_base.h>
26 #include <boost/shared_ptr.hpp>
28 #include <tpie/pipelining/parallel/options.h>
32 namespace pipelining {
34 namespace parallel_bits {
39 template <
typename dest_t>
43 template <
typename T1,
typename T2>
54 template <
typename Input,
typename Output>
59 static const size_t alignment = 64;
79 virtual void init_node(
node & r)
override {
88 std::vector<before_t *> m_dests;
95 stream_size_type sum_steps() {
96 stream_size_type res = 0;
97 for (
size_t i = 0; i < m_progressIndicators.size(); ++i) {
98 res += m_progressIndicators.get(i)->get_current();
103 virtual ~threads() {}
109 template <
typename Input,
typename Output,
typename fact_t>
118 typedef typename fact_t::template constructed<after_t>::type worker_t;
119 typedef typename worker_t::item_type T1;
122 static const size_t alignment = p_t::alignment;
134 : numJobs(st.opts.numJobs)
137 fact.hook_initialization(&hook);
139 m_data.realloc(numJobs);
140 this->m_progressIndicators.realloc(numJobs);
141 this->m_dests.resize(numJobs);
144 for (
size_t i = 0; i < numJobs; ++i) {
146 if (((
size_t) m_data.get(i)) % alignment != 0) {
147 log_warning() <<
"Thread " << i <<
" is not aligned: Address "
148 << m_data.get(i) <<
" is off by " <<
149 (((size_t) m_data.get(i)) % alignment) <<
" bytes"
154 new (this->m_progressIndicators.get(i))
pi_t();
163 for (
size_t i = 0; i < numJobs; ++i) {
164 m_data.get(i)->~before_t();
165 this->m_progressIndicators.get(i)->~pi_t();
168 this->m_progressIndicators.realloc(0);
205 typedef boost::mutex mutex_t;
206 typedef boost::condition_variable cond_t;
207 typedef boost::unique_lock<boost::mutex> lock_t;
269 return m_states[idx];
274 if (m_states[idx] != from) {
275 std::stringstream ss;
276 ss << idx <<
" Invalid state transition " << from <<
" -> " << to <<
"; current state is " << m_states[idx];
284 std::vector<node *> m_inputs;
285 std::vector<after_base *> m_outputs;
286 std::vector<worker_state> m_states;
291 , m_inputs(opts.numJobs, 0)
292 , m_outputs(opts.numJobs, 0)
293 , m_states(opts.numJobs, INITIALIZING)
298 virtual ~state_base() {
306 template <
typename T>
308 memory_size_type m_inputSize;
317 if (input.
size() > m_inputBuffer.
size())
318 throw tpie::exception(m_inputBuffer.
size() ?
"Input too large" :
"Input buffer not initialized");
320 memory_size_type items =
322 -m_inputBuffer.
begin();
329 , m_inputBuffer(opts.bufSize)
337 template <
typename T>
339 memory_size_type m_outputSize;
341 friend class after<T>;
350 , m_outputBuffer(opts.bufSize)
362 template <
typename T>
375 template <
typename T1,
typename T2>
378 typedef boost::shared_ptr<state> ptr;
379 typedef state_base::mutex_t mutex_t;
380 typedef state_base::cond_t cond_t;
381 typedef state_base::lock_t lock_t;
388 std::auto_ptr<threads<T1, T2> > pipes;
390 template <
typename fact_t>
393 , m_inputBuffers(opts.numJobs)
394 , m_outputBuffers(opts.numJobs)
398 pipes.reset(
new pipes_impl_t(fact, *
this));
401 void set_consumer_ptr(consumer<T2> * cons) {
405 consumer<T2> *
const * get_consumer_ptr_ptr()
const {
413 template <
typename T>
414 class after :
public after_base {
418 std::auto_ptr<parallel_output_buffer<T> > m_buffer;
419 array<parallel_output_buffer<T> *> & m_outputBuffers;
420 typedef state_base::lock_t lock_t;
421 consumer<T> *
const * m_cons;
426 template <
typename Input>
427 after(state<Input, T> & state,
431 , m_outputBuffers(state.m_outputBuffers)
432 , m_cons(state.get_consumer_ptr_ptr())
434 state.set_output_ptr(parId,
this);
435 set_name(
"Parallel after", PRIORITY_INSIGNIFICANT);
448 , m_outputBuffers(other.m_outputBuffers)
449 , m_cons(other.m_cons)
460 if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size())
461 flush_buffer_impl(
false);
463 m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item;
466 virtual void end()
override {
467 flush_buffer_impl(
true);
475 m_outputBuffers[parId] = m_buffer.get();
483 flush_buffer_impl(
true);
487 bool is_done()
const {
523 void flush_buffer_impl(
bool complete) {
530 lock_t lock(st.
mutex);
532 if (*m_cons == 0)
throw tpie::exception(
"Unexpected nullptr in flush_buffer");
534 (*m_cons)->consume(out);
536 st.
transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT);
543 m_buffer->m_outputSize = 0;
552 template <
typename T>
553 class before :
public node {
557 std::auto_ptr<parallel_input_buffer<T> > m_buffer;
558 array<parallel_input_buffer<T> *> & m_inputBuffers;
559 boost::thread m_worker;
564 virtual void push_all(array_view<T> items) = 0;
566 template <
typename Output>
567 before(state<T, Output> & st,
size_t parId)
570 , m_inputBuffers(st.m_inputBuffers)
572 set_name(
"Parallel before", PRIORITY_INSIGNIFICANT);
576 before(
const before & other)
579 , m_inputBuffers(other.m_inputBuffers)
588 boost::thread t(run_worker,
this);
605 throw tpie::exception(
"State 'partial_output' was not expected in before::ready");
607 throw tpie::exception(
"State 'outputting' was not expected in before::ready");
617 class running_signal {
618 typedef state_base::cond_t cond_t;
619 memory_size_type & sig;
620 cond_t & producerCond;
622 running_signal(memory_size_type & sig, cond_t & producerCond)
624 , producerCond(producerCond)
627 producerCond.notify_one();
632 producerCond.notify_one();
636 static void run_worker(before *
self) {
644 state_base::lock_t lock(st.
mutex);
646 m_buffer.reset(
new parallel_input_buffer<T>(st.opts));
647 m_inputBuffers[parId] = m_buffer.get();
675 template <
typename dest_t>
676 class before_impl :
public before<typename dest_t::item_type> {
677 typedef typename dest_t::item_type item_type;
682 template <
typename Output>
683 before_impl(state<item_type, Output> & st,
686 : before<item_type>(st, parId)
690 st.set_input_ptr(parId,
this);
700 for (
size_t i = 0; i < items.
size(); ++i) {
705 this->st.output(this->parId).flush_buffer();
712 template <
typename Input,
typename Output,
typename dest_t>
715 typedef typename state_t::ptr stateptr;
719 typedef typename dest_t::item_type item_type;
726 this->
set_name(
"Parallel output", PRIORITY_INSIGNIFICANT);
727 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
728 st->output(i).set_consumer(
this);
736 for (
size_t i = 0; i < a.
size(); ++i) {
747 template <
typename T1,
typename T2>
750 typedef T1 item_type;
754 typedef typename state_t::ptr stateptr;
759 boost::shared_ptr<consumer<T2> > cons;
761 stream_size_type m_steps;
770 bool has_ready_pipe() {
771 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
772 switch (st->get_state(i)) {
781 if (st->opts.maintainOrder && m_outputOrder.
front() != i)
804 bool has_outputting_pipe() {
805 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
806 switch (st->get_state(i)) {
813 if (st->opts.maintainOrder && m_outputOrder.
front() != i)
818 throw tpie::exception(
"State DONE not expected in has_outputting_pipe().");
834 bool has_processing_pipe() {
835 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
836 switch (st->get_state(i)) {
845 throw tpie::exception(
"State DONE not expected in has_processing_pipe().");
866 stream_size_type steps = st->pipes->sum_steps();
867 if (steps != m_steps) {
874 template <
typename consumer_t>
875 producer(stateptr st,
const consumer_t & cons)
878 , cons(
new consumer_t(cons))
881 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
884 this->
set_name(
"Parallel input", PRIORITY_INSIGNIFICANT);
885 memory_size_type usage =
886 st->opts.numJobs * st->opts.bufSize * (
sizeof(T1) +
sizeof(T2))
887 + st->opts.bufSize *
sizeof(item_type)
891 if (st->opts.maintainOrder) {
892 m_outputOrder.
resize(st->opts.numJobs);
897 inputBuffer.
resize(st->opts.bufSize);
899 state_base::lock_t lock(st->mutex);
900 while (st->runningWorkers != st->opts.numJobs) {
901 st->producerCond.wait(lock);
915 inputBuffer[written++] = item;
916 if (written < st->opts.bufSize) {
921 state_base::lock_t lock(st->mutex);
925 empty_input_buffer(lock);
929 void empty_input_buffer(state_base::lock_t & lock) {
930 while (written > 0) {
931 while (!has_ready_pipe()) {
932 st->producerCond.wait(lock);
934 switch (st->get_state(readyIdx)) {
936 throw tpie::exception(
"State 'INITIALIZING' not expected at this point");
940 item_type * first = &inputBuffer[0];
941 item_type * last = first + written;
942 parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx];
944 st->transition_state(readyIdx, IDLE, PROCESSING);
945 st->workerCond[readyIdx].notify_one();
947 if (st->opts.maintainOrder)
948 m_outputOrder.
push(readyIdx);
952 throw tpie::exception(
"State 'processing' not expected at this point");
955 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
956 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
957 st->workerCond[readyIdx].notify_one();
961 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
963 st->transition_state(readyIdx, OUTPUTTING, IDLE);
964 st->workerCond[readyIdx].notify_one();
965 if (st->opts.maintainOrder) {
966 if (m_outputOrder.
front() != readyIdx) {
967 log_error() <<
"Producer: Expected " << readyIdx <<
" in front; got "
968 << m_outputOrder.
front() << std::endl;
981 virtual void end()
override {
982 state_base::lock_t lock(st->mutex);
986 empty_input_buffer(lock);
990 st->set_consumer_ptr(cons.get());
994 while (!has_outputting_pipe()) {
995 if (!has_processing_pipe()) {
1000 st->producerCond.wait(lock);
1005 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1007 if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
1008 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1009 st->workerCond[readyIdx].notify_one();
1012 st->transition_state(readyIdx, OUTPUTTING, IDLE);
1013 if (st->opts.maintainOrder) {
1014 if (m_outputOrder.
front() != readyIdx) {
1015 log_error() <<
"Producer: Expected " << readyIdx <<
" in front; got "
1016 << m_outputOrder.
front() << std::endl;
1017 throw tpie::exception(
"Producer got wrong entry from has_ready_pipe");
1019 m_outputOrder.
pop();
1023 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
1024 st->transition_state(i, IDLE, DONE);
1025 st->workerCond[i].notify_one();
1027 while (st->runningWorkers > 0) {
1028 st->producerCond.wait(lock);
after_base & output(size_t idx)
Get the specified after instance.
void set_input_ptr(size_t idx, node *v)
Must not be used concurrently.
Encapsulation of two pointers from any random access container.
virtual void begin()
Begin pipeline processing phase.
virtual void flush_buffer()=0
Called by before::worker after a batch of items has been pushed.
virtual void worker_initialize()=0
Called by before::worker to initialize buffers.
virtual void set_consumer(node *)=0
For internal use in order to construct the pipeline graph.
virtual void flush_buffer() override
Invoked by before::push_all when all input items have been pushed.
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Accepts input items from the main thread and sends them down the pipeline.
Factory hook that sets the progress indicator of the nodes run in parallel to the null progress indic...
cond_t producerCond
Condition variable.
Class containing an array of node instances.
void set_output_ptr(size_t idx, after_base *v)
Must not be used concurrently.
progress_indicator_null pi_t
Progress indicator type.
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
Accepts output items and sends them to the main thread.
cond_t * workerCond
Condition variable, one per worker.
void push(T val)
Add an element to the front of the queue.
a dummy progress indicator that produces no output
Aligned, uninitialized storage.
void push(item_type item)
Accumulate input buffer and send off to workers.
virtual void push_all(array_view< T > items)=0
Overridden in subclass to push a buffer of items.
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
size_t runningWorkers
Shared state, must have mutex to write.
Encapsulation of two pointers from any random access container.
void pop()
Remove an element from the back of the queue.
T front()
Return the item that has been in the queue for the longest time.
Common state in parallel pipelining library.
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
virtual void push_all(array_view< item_type > items)
Push all items from buffer and flush output buffer afterwards.
virtual void set_consumer(node *cons) override
For internal use in order to construct the pipeline graph.
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Subclass of threads instantiating and managing the pipelines.
Node running in main thread, accepting an output buffer from the managing producer and forwards them ...
Non-templated virtual base class of after.
size_t size() const
Get number of elements in the array.
void resize(size_t size, const T &elm)
Change the size of the array.
mutex_t mutex
Single mutex.
iterator begin() const
Return an iterator to the beginning of the array.
virtual void begin() override
Begin pipeline processing phase.
void push(const T &item)
Push to thread-local buffer; flush it when full.
virtual void end() override
End pipeline processing phase.
User-supplied options to the parallelism framework.
State subclass containing the item type specific state, i.e.
iterator begin()
Return an iterator to the beginning of the array.
size_type size() const
Return the size of the array.
worker_state get_state(size_t idx)
Shared state, must have mutex to use.
Producer, running in main thread, managing the parallel execution.
virtual void end() override
End pipeline processing phase.
void transition_state(size_t idx, worker_state from, worker_state to)
Shared state, must have mutex to use.
virtual void consume(array_view< item_type > a) override
Push all items from output buffer to the rest of the pipeline.
iterator end() const
Return an iterator to the end of the array.
Whether to maintain order in parallel or not.
Instantiated in each thread.
logstream & log_error()
Return logstream for writing error log messages.
logstream & log_warning()
Return logstream for writing warning log messages.
virtual void begin() override
Begin pipeline processing phase.
void resize(size_t size=0)
Resize the queue; all data is lost.
Concrete consumer implementation.
virtual void worker_initialize()
Invoked by before::worker (in worker thread context).
node & input(size_t idx)
Get the specified before instance.