TPIE

2362a60
buffer.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_COMPRESSED_BUFFER_H
21 #define TPIE_COMPRESSED_BUFFER_H
22 
26 
27 #include <tpie/array.h>
28 #include <tpie/tpie_assert.h>
29 #include <tpie/compressed/thread.h>
30 #include <map>
31 #include <memory>
32 
33 namespace tpie {
34 
39 
44 
66  enum type {
79  };
80 };
81 
86 private:
87  typedef array<char> storage_t;
88 
89  storage_t m_storage;
90  memory_size_type m_size;
92  stream_size_type m_readOffset;
93  memory_size_type m_blockSize;
94 
95 public:
96  compressor_buffer(memory_size_type capacity)
97  : m_storage(capacity)
98  , m_size(0)
100  , m_readOffset(1111111111111111111ull)
101  , m_blockSize(std::numeric_limits<memory_size_type>::max())
102  {
103  }
104 
105  compressor_buffer_state::type get_state() const {
106  return m_state;
107  }
108 
109  void set_state(compressor_buffer_state::type to) {
110  m_state = to;
111  }
112 
113  void transition_state(compressor_buffer_state::type from,
115  {
116  tp_assert(!(m_state != from), "compressor_buffer: invalid state transition");
117  unused(from);
118  set_state(to);
119  }
120 
121  bool is_busy() {
122  switch (m_state) {
123  case compressor_buffer_state::dirty: return false;
124  case compressor_buffer_state::writing: return true;
125  case compressor_buffer_state::reading: return true;
126  case compressor_buffer_state::clean: return false;
127  }
128  tp_assert(false, "is_busy: compressor_buffer in invalid state");
129  return false; // suppress compiler warning
130  }
131 
135  char * get() {
136  return m_storage.get();
137  }
138 
142  const char * get() const {
143  return m_storage.get();
144  }
145 
149  memory_size_type size() const {
150  return m_size;
151  }
152 
156  memory_size_type capacity() const {
157  return m_storage.size();
158  }
159 
163  void set_size(memory_size_type size) {
164  m_size = size;
165  }
166 
170  void set_capacity(memory_size_type capacity) {
171  m_storage.resize(capacity);
172  m_size = 0;
173  }
174 
178  void reset() {
180  m_size = 0;
181  m_readOffset = 1111111111111111111ull;
182  m_blockSize = std::numeric_limits<memory_size_type>::max();
183  }
184 
185  memory_size_type get_block_size() { return m_blockSize; }
186  stream_size_type get_read_offset() { return m_readOffset; }
187  void set_block_size(memory_size_type s) { m_blockSize = s; }
188  void set_read_offset(stream_size_type s) { m_readOffset = s; }
189 };
190 
212 public:
213  typedef std::shared_ptr<compressor_buffer> buffer_t;
214 
217 
218  buffer_t allocate_own_buffer();
219  void release_own_buffer(buffer_t &);
220 
221  bool can_take_shared_buffer();
222  buffer_t take_shared_buffer();
223  void release_shared_buffer(buffer_t &);
224 
225 private:
226  class impl;
227  impl * pimpl;
228 };
229 
237 
245 public:
246  typedef std::shared_ptr<compressor_buffer> buffer_t;
247 
248  const static memory_size_type OWN_BUFFERS = 1;
249 
250  stream_buffers(memory_size_type blockSize)
251  : m_blockSize(blockSize)
252  , m_ownBuffers(0)
253  {
254  }
255 
256  ~stream_buffers() {
257  if (!empty()) {
258  log_debug() << "ERROR: ~stream_buffers: not empty!" << std::endl;
259  }
260  }
261 
262  static memory_size_type memory_usage(memory_size_type blockSize) {
263  return blockSize;
264  }
265 
266  buffer_t get_buffer(compressor_thread_lock & lock, stream_size_type blockNumber) {
267  if (!(m_ownBuffers < OWN_BUFFERS || can_take_shared_buffer())) {
268  // First, search for the buffer in the map.
269  buffermapit target = m_buffers.find(blockNumber);
270  if (target != m_buffers.end()) return target->second;
271 
272  // If not found, wait for a free buffer to become available.
273  buffer_t b;
274  while (true) {
275  buffermapit i = m_buffers.begin();
276  while (i != m_buffers.end() && !i->second.unique()) ++i;
277  if (i == m_buffers.end()) {
278  compressor().wait_for_request_done(lock);
279  continue;
280  } else {
281  b.swap(i->second);
282  m_buffers.erase(i);
283  break;
284  }
285  }
286 
287  b->reset();
288  m_buffers.insert(std::make_pair(blockNumber, b));
289  clean();
290  return b;
291  } else {
292  // First, search for the buffer in the map.
293  std::pair<buffermapit, bool> res
294  = m_buffers.insert(std::make_pair(blockNumber, buffer_t()));
295  buffermapit & target = res.first;
296  bool & inserted = res.second;
297  if (!inserted) return target->second;
298 
299  // If not found, find a free buffer and place it in target->second.
300 
301  // We have now placed an empty shared_ptr in m_buffers
302  // (an "insertion point"), and nobody is allowed to call clean()
303  // on us before we insert something in that point.
304 
305  // target->second is the only buffer in the map with use_count() == 0.
306  // If a buffer in the map has use_count() == 1 (that is, unique() == true),
307  // that means only our map (and nobody else) refers to the buffer,
308  // so it is free to be reused.
309  buffermapit i = m_buffers.begin();
310  while (i != m_buffers.end() && !i->second.unique()) ++i;
311 
312  if (i == m_buffers.end()) {
313  // No free found: allocate new buffer.
314  if (m_ownBuffers < OWN_BUFFERS) {
315  target->second = allocate_own_buffer();
316  } else if (can_take_shared_buffer()) {
317  target->second = take_shared_buffer();
318  } else {
319  // This is a contradition of the very first check
320  // in the beginning of the method.
321  tp_assert(false, "get_buffer: Could not get a new buffer "
322  "contrary to previous checks");
323  }
324  } else {
325  // Free found: reuse buffer.
326  target->second.swap(i->second);
327  m_buffers.erase(i);
328  }
329 
330  // Bump use count before cleaning.
331  buffer_t result = target->second;
332  clean();
333  result->reset();
334  return result;
335  }
336  }
337 
338  bool empty() const {
339  return m_buffers.empty();
340  }
341 
342  void clean() {
343  buffermapit i = m_buffers.begin();
344  while (i != m_buffers.end()) {
345  buffermapit j = i++;
346  if (j->second.get() == 0) {
347  // This item in the map represents an insertion point in get_buffer,
348  // but in that case, get_buffer has the compressor lock,
349  // and it shouldn't wait before inserting something
350  throw exception("stream_buffers: j->second.get() == 0");
351  } else if (j->second.unique()) {
352  if (shared_buffers() > 0) {
353  release_shared_buffer(j->second);
354  } else {
355  release_own_buffer(j->second);
356  }
357  m_buffers.erase(j);
358  }
359  }
360  }
361 
362 private:
363  memory_size_type own_buffers() {
364  return m_ownBuffers;
365  }
366 
367  memory_size_type shared_buffers() {
368  return m_buffers.size() - m_ownBuffers;
369  }
370 
371  void release_shared_buffer(buffer_t & b) {
372  the_stream_buffer_pool().release_shared_buffer(b);
373  }
374 
375  void release_own_buffer(buffer_t & b) {
376  --m_ownBuffers;
377  the_stream_buffer_pool().release_own_buffer(b);
378  }
379 
380  bool can_take_shared_buffer() {
381  return the_stream_buffer_pool().can_take_shared_buffer();
382  }
383 
384  buffer_t take_shared_buffer() {
385  return the_stream_buffer_pool().take_shared_buffer();
386  }
387 
388  buffer_t allocate_own_buffer() {
389  ++m_ownBuffers;
390  return the_stream_buffer_pool().allocate_own_buffer();
391  }
392 
393  compressor_thread & compressor() {
394  return the_compressor_thread();
395  }
396 
397  memory_size_type block_size() const {
398  return m_blockSize;
399  }
400 
401  memory_size_type m_blockSize;
402 
403  typedef std::map<stream_size_type, buffer_t> buffermap_t;
404  typedef buffermap_t::iterator buffermapit;
405  buffermap_t m_buffers;
406 
408  memory_size_type m_ownBuffers;
409 };
410 
411 } // namespace tpie
412 
413 #endif // TPIE_COMPRESSED_BUFFER_H
Defines the tp_assert macro.
stream_buffer_pool & the_stream_buffer_pool()
Get the stream buffer pool singleton.
T * get()
Return a raw pointer to the array content.
Definition: array.h:531
void finish_stream_buffer_pool()
Used by tpie::finish to free stream buffer pool.
The buffer is different from the contents on the disk.
Definition: buffer.h:69
void set_capacity(memory_size_type capacity)
Resize internal buffer, clearing all elements.
Definition: buffer.h:170
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
memory_size_type size() const
Get number of bytes used to store items.
Definition: buffer.h:149
Buffer manager for a single stream.
Definition: buffer.h:244
Generic internal array with known memory requirements.
Pool of shared buffers.
Definition: buffer.h:211
Interface to the compressor thread.
memory_size_type capacity() const
Get maximal byte size of buffer.
Definition: buffer.h:156
The buffer will soon change to reflect the contents on the disk.
Definition: buffer.h:75
A buffer for elements belonging to a specific stream block.
Definition: buffer.h:85
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
void set_size(memory_size_type size)
Set number of bytes used to store items.
Definition: buffer.h:163
The buffer is equal to the contents on the disk.
Definition: buffer.h:78
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:485
The different states of a compressor buffer.
Definition: buffer.h:65
size_type size() const
Return the size of the array.
Definition: array.h:526
void init_stream_buffer_pool()
Used by tpie::init to initialize stream buffer pool.
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
void reset()
Return buffer to a newly constructed state.
Definition: buffer.h:178
The buffer will soon be written to disk.
Definition: buffer.h:72