20 #ifndef TPIE_COMPRESSED_REQUEST_H
21 #define TPIE_COMPRESSED_REQUEST_H
29 #include <condition_variable>
33 #include <tpie/file_accessor/byte_stream_accessor.h>
55 , m_blockNumber(std::numeric_limits<stream_size_type>::max())
58 , m_endOfStream(
false)
69 void initiate_request() {
70 m_done = m_endOfStream =
false;
71 m_nextReadOffset = m_nextBlockSize = 0;
75 void clear_block_info() {
76 m_blockNumber = std::numeric_limits<stream_size_type>::max();
80 void set_block_info(stream_size_type blockNumber,
81 stream_size_type readOffset,
82 memory_size_type blockSize)
84 if (m_blockNumber != std::numeric_limits<stream_size_type>::max()
85 && blockNumber < m_blockNumber)
94 m_blockNumber = blockNumber;
95 m_readOffset = readOffset;
96 m_blockSize = blockSize;
97 m_changed.notify_all();
102 bool has_block_info(stream_size_type blockNumber)
104 if (m_blockNumber == std::numeric_limits<stream_size_type>::max())
107 if (blockNumber < m_blockNumber) {
108 std::stringstream ss;
109 ss <<
"Wanted block number " << blockNumber <<
", but recalled was " << m_blockNumber;
113 if (blockNumber == m_blockNumber)
120 memory_size_type get_block_size(stream_size_type blockNumber)
122 tp_assert(has_block_info(blockNumber),
"get_block_size: !has_block_info");
128 stream_size_type get_read_offset(stream_size_type blockNumber) {
129 tp_assert(has_block_info(blockNumber),
"get_read_offset: !has_block_info");
145 stream_size_type next_read_offset() {
146 return m_nextReadOffset;
150 void set_next_block_offset(stream_size_type offset) {
152 m_nextReadOffset = offset;
153 m_changed.notify_all();
157 std::condition_variable m_changed;
163 stream_size_type m_blockNumber;
164 stream_size_type m_readOffset;
165 memory_size_type m_blockSize;
169 stream_size_type m_nextReadOffset;
170 memory_size_type m_nextBlockSize;
188 : m_response(response)
193 void initiate_request() {
194 m_response->initiate_request();
203 typedef std::shared_ptr<compressor_buffer> buffer_t;
205 typedef std::condition_variable condition_t;
209 stream_size_type readOffset,
210 read_direction::type readDirection,
214 , m_fileAccessor(fileAccessor)
215 , m_readOffset(readOffset)
216 , m_readDirection(readDirection)
225 return *m_fileAccessor;
228 stream_size_type read_offset() {
232 read_direction::type get_read_direction() {
233 return m_readDirection;
236 void set_next_block_offset(stream_size_type offset) {
237 m_response->set_next_block_offset(offset);
243 const stream_size_type m_readOffset;
244 const read_direction::type m_readDirection;
249 typedef std::shared_ptr<compressor_buffer> buffer_t;
255 stream_size_type writeOffset,
256 memory_size_type blockItems,
257 stream_size_type blockNumber,
261 , m_fileAccessor(fileAccessor)
262 , m_tempFile(tempFile)
263 , m_writeOffset(writeOffset)
264 , m_blockItems(blockItems)
265 , m_blockNumber(blockNumber)
270 return *m_fileAccessor;
281 memory_size_type block_items() {
285 bool should_append() {
286 return m_writeOffset == std::numeric_limits<stream_size_type>::max();
289 stream_size_type write_offset() {
290 return m_writeOffset;
294 void set_block_info(stream_size_type readOffset,
295 memory_size_type blockSize)
297 m_response->set_block_info(m_blockNumber, readOffset, blockSize);
301 void update_recorded_size() {
302 m_response->set_done();
303 if (m_tempFile != NULL) m_tempFile->update_recorded_size(m_fileAccessor->file_size());
307 void update_recorded_size(stream_size_type fileSize) {
308 m_response->set_done();
309 if (m_tempFile != NULL) m_tempFile->update_recorded_size(fileSize);
316 const stream_size_type m_writeOffset;
317 const memory_size_type m_blockItems;
318 const stream_size_type m_blockNumber;
349 : m_kind(compressor_request_kind::NONE)
358 : m_kind(compressor_request_kind::NONE)
360 switch (other.kind()) {
361 case compressor_request_kind::NONE:
363 case compressor_request_kind::READ:
364 set_read_request(other.get_read_request());
366 case compressor_request_kind::WRITE:
367 set_write_request(other.get_write_request());
374 stream_size_type readOffset,
375 read_direction::type readDirection,
379 m_kind = compressor_request_kind::READ;
380 return *
new (m_payload)
read_request(buffer, fileAccessor, readOffset,
381 readDirection, response);
386 m_kind = compressor_request_kind::READ;
390 write_request & set_write_request(
const write_request::buffer_t & buffer,
393 stream_size_type writeOffset,
394 memory_size_type blockItems,
395 stream_size_type blockNumber,
399 m_kind = compressor_request_kind::WRITE;
400 return *
new (m_payload)
write_request(buffer, fileAccessor, tempFile,
401 writeOffset, blockItems,
402 blockNumber, response);
407 m_kind = compressor_request_kind::WRITE;
418 return *
reinterpret_cast<const read_request *
>(m_payload);
438 return *
reinterpret_cast<const request_base *
>(m_payload);
441 compressor_request_kind::type kind()
const {
448 case compressor_request_kind::NONE:
450 case compressor_request_kind::READ:
451 get_read_request().~read_request();
453 case compressor_request_kind::WRITE:
454 get_write_request().~write_request();
457 m_kind = compressor_request_kind::NONE;
460 compressor_request_kind::type m_kind;
464 payload_items = (payload_bytes +
sizeof(size_t) - 1) /
sizeof(size_t)
466 size_t m_payload[payload_items];
471 #endif // TPIE_COMPRESSED_REQUEST_H
Defines the tp_assert macro.
void unused(const T &x)
Declare that a variable is unused on purpose.
Base class for read_request and write_request.
Declare default file accessor.
Useful compressed stream predeclarations.
Class representing a reference to a temporary file.
Read/write direction enumeration.
Tagged union containing either a read_request or a write_request.
#define tp_assert(condition, message)
pipe_middle< split_factory< bits::buffer_input_t, node, bits::buffer_output_t > > buffer
The buffer node inserts a phase boundary into the pipeline by writing items to disk.
Response to an I/O request.