TPIE

2362a60
request.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_COMPRESSED_REQUEST_H
21 #define TPIE_COMPRESSED_REQUEST_H
22 
26 
27 #include <memory>
28 #include <thread>
29 #include <condition_variable>
30 #include <tpie/tpie_assert.h>
31 #include <tpie/tempname.h>
33 #include <tpie/file_accessor/byte_stream_accessor.h>
36 
37 namespace tpie {
38 
52 public:
54  : m_done(false)
55  , m_blockNumber(std::numeric_limits<stream_size_type>::max())
56  , m_readOffset(0)
57  , m_blockSize(0)
58  , m_endOfStream(false)
59  , m_nextReadOffset(0)
60  , m_nextBlockSize(0)
61  {
62  }
63 
64  // any, stream
65  // Waits on the condition variable m_changed.
66  void wait(compressor_thread_lock & lock);
67 
68  // any, stream
69  void initiate_request() {
70  m_done = m_endOfStream = false;
71  m_nextReadOffset = m_nextBlockSize = 0;
72  }
73 
74  // write, stream
75  void clear_block_info() {
76  m_blockNumber = std::numeric_limits<stream_size_type>::max();
77  }
78 
79  // write, thread -- must have lock!
80  void set_block_info(stream_size_type blockNumber,
81  stream_size_type readOffset,
82  memory_size_type blockSize)
83  {
84  if (m_blockNumber != std::numeric_limits<stream_size_type>::max()
85  && blockNumber < m_blockNumber)
86  {
87  //log_debug() << "set_block_info(blockNumber=" << blockNumber
88  // << ", readOffset=" << readOffset << ", blockSize=" << blockSize << "): "
89  // << "We already know the size of block " << m_blockNumber << std::endl;
90  } else {
91  //log_debug() << "set_block_info(blockNumber=" << blockNumber
92  // << ", readOffset=" << readOffset << ", blockSize=" << blockSize << "): "
93  // << "Previous was " << m_blockNumber << std::endl;
94  m_blockNumber = blockNumber;
95  m_readOffset = readOffset;
96  m_blockSize = blockSize;
97  m_changed.notify_all();
98  }
99  }
100 
101  // write, stream
102  bool has_block_info(stream_size_type blockNumber)
103  {
104  if (m_blockNumber == std::numeric_limits<stream_size_type>::max())
105  return false;
106 
107  if (blockNumber < m_blockNumber) {
108  std::stringstream ss;
109  ss << "Wanted block number " << blockNumber << ", but recalled was " << m_blockNumber;
110  throw exception(ss.str());
111  }
112 
113  if (blockNumber == m_blockNumber)
114  return true;
115  else // blockNumber > m_blockNumber
116  return false;
117  }
118 
119  // write, stream
120  memory_size_type get_block_size(stream_size_type blockNumber)
121  {
122  tp_assert(has_block_info(blockNumber), "get_block_size: !has_block_info");
123  unused(blockNumber);
124  return m_blockSize;
125  }
126 
127  // write, stream
128  stream_size_type get_read_offset(stream_size_type blockNumber) {
129  tp_assert(has_block_info(blockNumber), "get_read_offset: !has_block_info");
130  unused(blockNumber);
131  return m_readOffset;
132  }
133 
134  // any, thread
135  void set_done() {
136  m_done = true;
137  }
138 
139  // any, stream
140  bool done() {
141  return m_done;
142  }
143 
144  // read, stream
145  stream_size_type next_read_offset() {
146  return m_nextReadOffset;
147  }
148 
149  // read, thread
150  void set_next_block_offset(stream_size_type offset) {
151  m_done = true;
152  m_nextReadOffset = offset;
153  m_changed.notify_all();
154  }
155 
156 private:
157  std::condition_variable m_changed;
158 
159  // Information about either read or write
160  bool m_done;
161 
162  // Information about the write
163  stream_size_type m_blockNumber;
164  stream_size_type m_readOffset;
165  memory_size_type m_blockSize;
166 
167  // Information about the read
168  bool m_endOfStream;
169  stream_size_type m_nextReadOffset;
170  memory_size_type m_nextBlockSize;
171 };
172 
173 #ifdef __GNUC__
174 class __attribute__((__may_alias__)) request_base;
175 class __attribute__((__may_alias__)) read_request;
176 class __attribute__((__may_alias__)) write_request;
177 #endif // __GNUC__
178 
186 protected:
188  : m_response(response)
189  {
190  }
191 
192 public:
193  void initiate_request() {
194  m_response->initiate_request();
195  }
196 
197 protected:
198  compressor_response * m_response;
199 };
200 
201 class read_request : public request_base {
202 public:
203  typedef std::shared_ptr<compressor_buffer> buffer_t;
205  typedef std::condition_variable condition_t;
206 
207  read_request(buffer_t buffer,
208  file_accessor_t * fileAccessor,
209  stream_size_type readOffset,
210  read_direction::type readDirection,
211  compressor_response * response)
212  : request_base(response)
213  , m_buffer(buffer)
214  , m_fileAccessor(fileAccessor)
215  , m_readOffset(readOffset)
216  , m_readDirection(readDirection)
217  {
218  }
219 
220  buffer_t buffer() {
221  return m_buffer;
222  }
223 
224  file_accessor_t & file_accessor() {
225  return *m_fileAccessor;
226  }
227 
228  stream_size_type read_offset() {
229  return m_readOffset;
230  }
231 
232  read_direction::type get_read_direction() {
233  return m_readDirection;
234  }
235 
236  void set_next_block_offset(stream_size_type offset) {
237  m_response->set_next_block_offset(offset);
238  }
239 
240 private:
241  buffer_t m_buffer;
242  file_accessor_t * m_fileAccessor;
243  const stream_size_type m_readOffset;
244  const read_direction::type m_readDirection;
245 };
246 
247 class write_request : public request_base {
248 public:
249  typedef std::shared_ptr<compressor_buffer> buffer_t;
251 
252  write_request(const buffer_t & buffer,
253  file_accessor_t * fileAccessor,
254  temp_file * tempFile,
255  stream_size_type writeOffset,
256  memory_size_type blockItems,
257  stream_size_type blockNumber,
258  compressor_response * response)
259  : request_base(response)
260  , m_buffer(buffer)
261  , m_fileAccessor(fileAccessor)
262  , m_tempFile(tempFile)
263  , m_writeOffset(writeOffset)
264  , m_blockItems(blockItems)
265  , m_blockNumber(blockNumber)
266  {
267  }
268 
269  file_accessor_t & file_accessor() {
270  return *m_fileAccessor;
271  }
272 
273  buffer_t buffer() {
274  return m_buffer;
275  }
276 
277  temp_file * get_temp_file() {
278  return m_tempFile;
279  }
280 
281  memory_size_type block_items() {
282  return m_blockItems;
283  }
284 
285  bool should_append() {
286  return m_writeOffset == std::numeric_limits<stream_size_type>::max();
287  }
288 
289  stream_size_type write_offset() {
290  return m_writeOffset;
291  }
292 
293  // must have lock!
294  void set_block_info(stream_size_type readOffset,
295  memory_size_type blockSize)
296  {
297  m_response->set_block_info(m_blockNumber, readOffset, blockSize);
298  }
299 
300  // must have lock!
301  void update_recorded_size() {
302  m_response->set_done();
303  if (m_tempFile != NULL) m_tempFile->update_recorded_size(m_fileAccessor->file_size());
304  }
305 
306  // must have lock!
307  void update_recorded_size(stream_size_type fileSize) {
308  m_response->set_done();
309  if (m_tempFile != NULL) m_tempFile->update_recorded_size(fileSize);
310  }
311 
312 private:
313  buffer_t m_buffer;
314  file_accessor_t * m_fileAccessor;
315  temp_file * m_tempFile;
316  const stream_size_type m_writeOffset;
317  const memory_size_type m_blockItems;
318  const stream_size_type m_blockNumber;
319 };
320 
322 public:
323  enum type {
324  NONE,
325  READ,
326  WRITE
327  };
328 
329 private:
330  compressor_request_kind() /*= delete*/;
331  compressor_request_kind(const compressor_request_kind &) /*= delete*/;
332  ~compressor_request_kind() /*= delete*/;
333 };
334 
347 public:
349  : m_kind(compressor_request_kind::NONE)
350  {
351  }
352 
353  ~compressor_request() {
354  destruct();
355  }
356 
358  : m_kind(compressor_request_kind::NONE)
359  {
360  switch (other.kind()) {
361  case compressor_request_kind::NONE:
362  break;
363  case compressor_request_kind::READ:
364  set_read_request(other.get_read_request());
365  break;
366  case compressor_request_kind::WRITE:
367  set_write_request(other.get_write_request());
368  break;
369  }
370  }
371 
372  read_request & set_read_request(const read_request::buffer_t & buffer,
373  read_request::file_accessor_t * fileAccessor,
374  stream_size_type readOffset,
375  read_direction::type readDirection,
376  compressor_response * response)
377  {
378  destruct();
379  m_kind = compressor_request_kind::READ;
380  return *new (m_payload) read_request(buffer, fileAccessor, readOffset,
381  readDirection, response);
382  }
383 
384  read_request & set_read_request(const read_request & other) {
385  destruct();
386  m_kind = compressor_request_kind::READ;
387  return *new (m_payload) read_request(other);
388  }
389 
390  write_request & set_write_request(const write_request::buffer_t & buffer,
391  write_request::file_accessor_t * fileAccessor,
392  temp_file * tempFile,
393  stream_size_type writeOffset,
394  memory_size_type blockItems,
395  stream_size_type blockNumber,
396  compressor_response * response)
397  {
398  destruct();
399  m_kind = compressor_request_kind::WRITE;
400  return *new (m_payload) write_request(buffer, fileAccessor, tempFile,
401  writeOffset, blockItems,
402  blockNumber, response);
403  }
404 
405  write_request & set_write_request(const write_request & other) {
406  destruct();
407  m_kind = compressor_request_kind::WRITE;
408  return *new (m_payload) write_request(other);
409  }
410 
411  // Precondition: kind() == READ
412  read_request & get_read_request() {
413  return *reinterpret_cast<read_request *>(m_payload);
414  }
415 
416  // Precondition: kind() == READ
417  const read_request & get_read_request() const {
418  return *reinterpret_cast<const read_request *>(m_payload);
419  }
420 
421  // Precondition: kind() == WRITE
422  write_request & get_write_request() {
423  return *reinterpret_cast<write_request *>(m_payload);
424  }
425 
426  // Precondition: kind() == WRITE
427  const write_request & get_write_request() const {
428  return *reinterpret_cast<const write_request *>(m_payload);
429  }
430 
431  // Precondition: kind() != NONE
432  request_base & get_request_base() {
433  return *reinterpret_cast<request_base *>(m_payload);
434  }
435 
436  // Precondition: kind() != NONE
437  const request_base & get_request_base() const {
438  return *reinterpret_cast<const request_base *>(m_payload);
439  }
440 
441  compressor_request_kind::type kind() const {
442  return m_kind;
443  }
444 
445 private:
446  void destruct() {
447  switch (m_kind) {
448  case compressor_request_kind::NONE:
449  break;
450  case compressor_request_kind::READ:
451  get_read_request().~read_request();
452  break;
453  case compressor_request_kind::WRITE:
454  get_write_request().~write_request();
455  break;
456  }
457  m_kind = compressor_request_kind::NONE;
458  }
459 
460  compressor_request_kind::type m_kind;
461 
462  enum {
463  payload_bytes = sizeof(read_request) < sizeof(write_request) ? sizeof(write_request) : sizeof(read_request),
464  payload_items = (payload_bytes + sizeof(size_t) - 1) / sizeof(size_t)
465  };
466  size_t m_payload[payload_items];
467 };
468 
469 } // namespace tpie
470 
471 #endif // TPIE_COMPRESSED_REQUEST_H
Defines the tp_assert macro.
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
Base class for read_request and write_request.
Definition: request.h:185
Declare default file accessor.
Useful compressed stream predeclarations.
Class representing a reference to a temporary file.
Definition: tempname.h:202
Read/write direction enumeration.
Temporary file names.
Tagged union containing either a read_request or a write_request.
Definition: request.h:346
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
pipe_middle< split_factory< bits::buffer_input_t, node, bits::buffer_output_t > > buffer
The buffer node inserts a phase boundary into the pipeline by writing items to disk.
Definition: buffer.h:207
Response to an I/O request.
Definition: request.h:51