TPIE

2362a60
pipes.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_PARALLEL_PIPES_H__
21 #define __TPIE_PIPELINING_PARALLEL_PIPES_H__
22 
23 #include <tpie/job.h>
24 #include <tpie/pipelining/pipe_base.h>
25 #include <tpie/pipelining/parallel/factory.h>
27 
28 namespace tpie {
29 
30 namespace pipelining {
31 
40 template <typename fact_t>
41 pipe_middle<parallel_bits::factory<fact_t> >
42 parallel(pipe_middle<fact_t> && fact, maintain_order_type maintainOrder, size_t numJobs, size_t bufSize = 2048) {
44  switch (maintainOrder) {
45  case arbitrary_order:
46  opts.maintainOrder = false;
47  break;
48  case maintain_order:
49  opts.maintainOrder = true;
50  break;
51  }
52  opts.numJobs = numJobs;
53  opts.bufSize = bufSize;
56  (std::move(fact.factory), std::move(opts)));
57 }
58 
65 template <typename fact_t>
66 pipe_middle<parallel_bits::factory<fact_t> >
68  return parallel(std::move(fact), maintainOrder, default_worker_count());
69 }
70 
71 template <typename fact_t>
72 pipe_middle<parallel_bits::factory<fact_t> >
73 parallel(pipe_middle<fact_t> && fact, bool maintainOrder, size_t numJobs, size_t bufSize = 2048) {
74  log_fatal() << "The second argument to tpie::pipelining::parallel has changed.\n"
75  << "Use maintain_order instead of true and arbitrary_order instead of false."
76  << std::endl;
77  return parallel(std::move(fact), maintainOrder ? maintain_order : arbitrary_order, numJobs, bufSize);
78 }
79 
80 template <typename fact_t>
81 pipe_middle<parallel_bits::factory<fact_t> >
82 parallel(pipe_middle<fact_t> && fact, bool maintainOrder) {
83  log_fatal() << "The second argument to tpie::pipelining::parallel has changed.\n"
84  << "Use maintain_order instead of true and arbitrary_order instead of false."
85  << std::endl;
86  return parallel(std::move(fact), maintainOrder ? maintain_order : arbitrary_order);
87 }
88 
89 } // namespace pipelining
90 
91 } // namespace tpie
92 
93 #endif // __TPIE_PIPELINING_PARALLEL_PIPES_H__
pipe_middle< parallel_bits::factory< fact_t > > parallel(pipe_middle< fact_t > &&fact, maintain_order_type maintainOrder, size_t numJobs, size_t bufSize=2048)
Runs a pipeline in multiple threads.
Definition: pipes.h:42
Factory instantiating a parallel multithreaded pipeline.
Definition: factory.h:38
Do not maintain order; push items as soon as a worker has processed them.
logstream & log_fatal()
Return logstream for writing fatal log messages.
Definition: tpie_log.h:142
memory_size_type default_worker_count()
Return the number of job threads initialized by the job framework in init_job().
Job class for job manager.
Maintain order; push items in the same order that a single thread would have.
User-supplied options to the parallelism framework.
Definition: options.h:34
A pipe_middle class pushes input down the pipeline.
Definition: pipe_base.h:241
Whether to maintain order in parallel or not.
maintain_order_type
Type describing whether to maintain the order of items in parallel.