TPIE

2362a60
merger.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_MERGER_H__
21 #define __TPIE_PIPELINING_MERGER_H__
22 
24 #include <tpie/compressed/stream.h>
25 #include <tpie/file_stream.h>
26 #include <tpie/tpie_assert.h>
27 #include <tpie/pipelining/store.h>
28 namespace tpie {
29 
30 template <typename specific_store_t, typename pred_t>
31 class merger {
32 private:
33  typedef typename specific_store_t::store_type store_type;
34  typedef typename specific_store_t::element_type element_type;
35 
37 public:
38  inline merger(pred_t pred, specific_store_t store,
40  : pq(0, predwrap(store_pred_t(pred)), bucket)
41  , in(bucket)
42  , itemsRead(bucket)
43  , m_store(store) {
44  }
45 
46  inline bool can_pull() {
47  return !pq.empty();
48  }
49 
50  inline store_type pull() {
51  tp_assert(can_pull(), "pull() while !can_pull()");
52  store_type el = std::move(pq.top().first);
53  size_t i = pq.top().second;
54  if (in[i].can_read() && itemsRead[i] < runLength) {
55  pq.pop_and_push(
56  std::make_pair(m_store.element_to_store(in[i].read()), i));
57  ++itemsRead[i];
58  } else {
59  pq.pop();
60  }
61  if (!can_pull()) {
62  reset();
63  }
64  return std::move(el);
65  }
66 
67  inline void reset() {
68  in.resize(0);
69  pq.resize(0);
70  itemsRead.resize(0);
71  }
72 
73  // Initialize merger with given sorted input runs. Each file stream is
74  // assumed to have a stream offset pointing to the first item in the run,
75  // and runLength items are read from each stream (unless end of stream
76  // occurs earlier).
77  // Precondition: !can_pull()
78  void reset(array<file_stream<element_type> > & inputs, stream_size_type runLength) {
79  this->runLength = runLength;
80  tp_assert(pq.empty(), "Reset before we are done");
81  in.swap(inputs);
82  pq.resize(in.size());
83  for (size_t i = 0; i < in.size(); ++i) {
84  pq.unsafe_push(
85  std::make_pair(
86  m_store.element_to_store(in[i].read()), i));
87  }
88  pq.make_safe();
89  itemsRead.resize(in.size(), 1);
90  }
91 
92  inline static memory_size_type memory_usage(memory_size_type fanout) {
93  return sizeof(merger)
94  - sizeof(internal_priority_queue<std::pair<store_type, size_t>, predwrap>) // pq
95  + static_cast<memory_size_type>(internal_priority_queue<std::pair<store_type, size_t>, predwrap>::memory_usage(fanout)) // pq
96  - sizeof(array<file_stream<element_type> >) // in
97  + static_cast<memory_size_type>(array<file_stream<element_type> >::memory_usage(fanout)) // in
98  - fanout*sizeof(file_stream<element_type>) // in file_streams
99  + fanout*file_stream<element_type>::memory_usage() // in file_streams
100  - sizeof(array<size_t>) // itemsRead
101  + static_cast<memory_size_type>(array<size_t>::memory_usage(fanout)) // itemsRead
102  ;
103  }
104 
105  class predwrap {
106  public:
107  typedef std::pair<store_type, size_t> item_type;
108  typedef item_type first_argument_type;
109  typedef item_type second_argument_type;
110  typedef bool result_type;
111 
112  predwrap(store_pred_t pred)
113  : pred(pred)
114  {
115  }
116 
117  inline bool operator()(const item_type & lhs, const item_type & rhs) {
118  return pred(lhs.first, rhs.first);
119  }
120 
121  private:
122  store_pred_t pred;
123  };
124 
125 private:
128  array<stream_size_type> itemsRead;
129  stream_size_type runLength;
130  specific_store_t m_store;
131 };
132 
133 } // namespace tpie
134 
135 #endif // __TPIE_PIPELINING_MERGER_H__
Defines the tp_assert macro.
A generic array with a fixed size.
Definition: array.h:144
Standard binary internal heap.
Compressed stream public API.
Simple heap based priority queue implementation.
Class storring a reference to a memory bucket.
Definition: memory.h:366
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
#define tp_assert(condition, message)
Definition: tpie_assert.h:48