20 #ifndef TPIE_COMPRESSED_STREAM_H
21 #define TPIE_COMPRESSED_STREAM_H
32 #include <tpie/file_accessor/byte_stream_accessor.h>
38 #include <tpie/stream_writable.h>
113 else if (!compressionFlags)
127 typedef std::shared_ptr<compressor_buffer> buffer_t;
147 virtual void post_open() = 0;
149 void open_inner(
const std::string & path,
151 memory_size_type userDataSize);
156 bool is_readable()
const throw() {
return m_canRead; }
158 bool is_writable()
const throw() {
return m_canWrite; }
160 static memory_size_type block_size(
double blockFactor)
throw ();
162 static double calculate_block_factor(memory_size_type blockSize)
throw ();
164 static memory_size_type block_memory_usage(
double blockFactor);
166 memory_size_type block_items()
const;
168 memory_size_type block_size()
const;
170 template <
typename TT>
171 void read_user_data(TT & data) {
172 if (
sizeof(TT) != user_data_size())
173 throw stream_exception(
"Wrong user data size");
174 read_user_data(reinterpret_cast<void *>(&data),
sizeof(TT));
177 memory_size_type read_user_data(
void * data, memory_size_type count);
179 template <
typename TT>
180 void write_user_data(
const TT & data) {
181 if (
sizeof(TT) > max_user_data_size())
182 throw stream_exception(
"Wrong user data size");
183 write_user_data(reinterpret_cast<const void *>(&data),
sizeof(TT));
186 void write_user_data(
const void * data, memory_size_type count);
188 memory_size_type user_data_size()
const;
190 memory_size_type max_user_data_size()
const;
192 const std::string & path()
const;
204 void open(
const std::string & path,
206 memory_size_type userDataSize = 0,
210 open(path, open::translate(accessType, cacheHint, compressionFlags), userDataSize);
216 void open(memory_size_type userDataSize,
227 memory_size_type userDataSize = 0,
230 open(file, open::translate(accessType, cacheHint, compressionFlags), userDataSize);
237 const memory_size_type userDataSize = 0;
245 const memory_size_type userDataSize = 0;
253 const memory_size_type userDataSize = 0;
297 void open(
const std::string & path,
open::type openFlags=open::defaults, memory_size_type userDataSize=0);
305 void open(
open::type openFlags=open::defaults, memory_size_type userDataSize=0);
350 bool is_open()
const {
return m_open; }
352 stream_size_type size()
const {
return m_size; }
354 stream_size_type file_size()
const {
return size(); }
356 stream_size_type offset()
const {
357 switch (m_seekState) {
358 case seek_state::none:
360 case seek_state::beginning:
362 case seek_state::end:
364 case seek_state::position:
367 tp_assert(
false,
"offset: Unreachable statement; m_seekState invalid");
417 stream_size_type m_currentFileSize;
427 stream_size_type m_lastWriteBlockNumber;
429 seek_state::type m_seekState;
458 stream_size_type m_nextReadOffset;
479 if (!m_committed) m_stream->close();
512 template <
typename T>
536 virtual ~file_stream() {
539 }
catch (std::exception & e) {
540 log_error() <<
"Someone threw an error in file_stream::~file_stream: " << e.what() << std::endl;
545 static memory_size_type memory_usage(
double blockFactor=1.0) {
547 return sizeof(file_stream)
549 + stream_buffers::memory_usage(block_size(blockFactor))
557 if (!this->is_open()) {
558 out <<
"[Closed stream]";
564 out <<
" (block " << block_number()
566 <<
", item " << block_item_index()
569 if (use_compression()) {
570 out <<
", compressed";
572 out <<
", uncompressed";
575 switch (m_seekState) {
576 case seek_state::none:
578 case seek_state::beginning:
579 out <<
", seeking to beginning";
581 case seek_state::end:
582 out <<
", seeking to end";
584 case seek_state::position:
596 if (m_seekState == seek_state::none) {
597 if (
can_read()) out <<
", can read";
598 else out <<
", cannot read";
604 if (m_currentFileSize != std::numeric_limits<stream_size_type>::max())
605 out <<
", current file size " << m_currentFileSize;
614 std::stringstream ss;
619 virtual void post_open()
override {
631 if (!use_compression()) {
640 offset += this->offset();
650 if (
m_buffer.get() != 0 && buffer_block_number() == 0) {
652 m_nextItem = m_bufferBegin;
654 m_seekState = seek_state::none;
657 m_seekState = seek_state::beginning;
662 m_seekState = seek_state::end;
665 m_seekState = seek_state::none;
674 stream_size_type blockItemIndex =
676 memory_size_type cast =
static_cast<memory_size_type
>(blockItemIndex);
677 tp_assert(blockItemIndex == cast,
"seek: blockItemIndex out of bounds");
678 m_nextItem = m_bufferBegin + cast;
681 m_seekState = seek_state::none;
683 m_seekState = seek_state::end;
689 tp_assert(
false,
"seek: Unknown whence");
699 tp_assert(is_open(),
"truncate: !is_open");
701 if (offset == size())
703 else if (offset == 0)
705 else if (!use_compression())
706 truncate_uncompressed(offset);
717 tp_assert(is_open(),
"truncate: !is_open");
719 if (pos.
offset() == size())
721 else if (pos.
offset() == 0)
723 else if (!use_compression())
724 truncate_uncompressed(pos.
offset());
726 truncate_compressed(pos);
735 void truncate_zero() {
749 m_nextItem = m_bufferBegin;
750 m_seekState = seek_state::none;
754 void truncate_uncompressed(stream_size_type offset) {
755 tp_assert(!use_compression(),
"truncate_uncompressed called on compressed stream");
757 stream_size_type currentOffset = this->offset();
759 && block_number(offset) == buffer_block_number()
766 memory_size_type blockItemIndex =
768 m_nextItem = m_bufferBegin + blockItemIndex;
771 m_seekState = seek_state::none;
776 compressor_thread_lock l(compressor());
793 seek(std::min(currentOffset, offset));
796 void truncate_compressed(
const stream_position & pos) {
797 tp_assert(use_compression(),
"truncate_compressed called on uncompressed stream");
799 stream_size_type offset = pos.offset();
800 stream_position finalDestination = (offset < this->offset()) ? pos :
get_position();
802 if (
m_buffer.get() == 0 || block_number(offset) != buffer_block_number()) {
811 m_currentFileSize = std::numeric_limits<stream_size_type>::max();
812 compressor_thread_lock l(compressor());
819 memory_size_type blockItemIndex =
821 m_nextItem = m_bufferBegin + blockItemIndex;
842 tp_assert(is_open(),
"get_position: !is_open");
844 switch (m_seekState) {
845 case seek_state::position:
848 case seek_state::beginning:
850 case seek_state::none:
852 if (m_nextItem == m_bufferEnd)
858 if (m_nextItem == m_bufferEnd) {
865 m_nextItem = m_bufferBegin;
868 case seek_state::end:
873 stream_size_type readOffset;
874 stream_size_type blockNumber = block_number(offset());
883 tp_assert(
false,
"get_position: Invalid block_number");
884 readOffset = 1111111111111111111ull;
910 if (!use_compression() && pos.read_offset() != 0)
913 if (pos.
offset() > size())
917 && block_number(pos.
offset()) == buffer_block_number())
927 m_nextItem = m_bufferBegin + block_item_index();
928 m_seekState = seek_state::none;
933 m_seekState = seek_state::position;
951 return *m_nextItem++;
953 const T & res =
peek();
974 if (m_seekState != seek_state::none) perform_seek();
976 if (m_nextItem == m_bufferEnd) {
983 read_next_block(l, block_number());
1002 template <
typename IT>
1003 void read(IT
const a, IT
const b) {
1004 for (IT i = a; i != b; ++i) *i =
read();
1017 return offset() < size();
1027 return offset() > 0;
1030 const T & read_back() {
1031 if (m_seekState != seek_state::none) {
1033 perform_seek(read_direction::backward);
1035 if (m_nextItem == m_bufferBegin) {
1036 if (
m_offset == 0)
throw end_of_stream_exception();
1038 compressor_thread_lock l(compressor());
1043 if (use_compression()) {
1044 read_previous_block(l, block_number() - 1);
1046 read_next_block(l, block_number() - 1);
1047 m_nextItem = m_bufferEnd;
1052 return *--m_nextItem;
1055 void write(
const T & item) {
1057 *m_nextItem++ = item;
1064 if (m_seekState != seek_state::none) perform_seek();
1066 if (!use_compression()) {
1067 if (m_nextItem == m_bufferEnd) {
1068 compressor_thread_lock lock(compressor());
1073 if (offset() == size()) {
1075 m_nextItem = m_bufferBegin;
1077 read_next_block(lock, block_number());
1081 *m_nextItem++ = item;
1084 cache_read_writes();
1089 throw stream_exception(
"Non-appending write attempted");
1091 if (m_nextItem == m_bufferEnd) {
1092 compressor_thread_lock l(compressor());
1098 m_nextItem = m_bufferBegin;
1101 *m_nextItem++ = item;
1106 cache_read_writes();
1109 template <
typename IT>
1110 void write(IT
const a, IT
const b) {
1111 for (IT i = a; i != b; ++i) write(*i);
1124 void perform_seek(read_direction::type dir=read_direction::forward) {
1125 if (!is_open())
throw stream_exception(
"Stream is not open");
1128 close_on_fail_guard closeOnFail(
this);
1130 tp_assert(!(m_seekState == seek_state::none),
"perform_seek when seekState is none");
1134 compressor_thread_lock l(compressor());
1145 if (m_seekState == seek_state::beginning && size() == 0) {
1146 m_seekState = seek_state::end;
1150 if (m_seekState == seek_state::position
1153 m_seekState = seek_state::end;
1156 if (m_seekState == seek_state::beginning) {
1161 tp_assert(!(size() == 0),
"Seek beginning when size is zero");
1162 if (use_compression()) {
1163 m_nextReadOffset = 0;
1165 read_next_block(l, 0);
1168 }
else if (m_seekState == seek_state::position) {
1175 tp_assert(!(blockItemIndex >=
m_blockItems),
"perform_seek: Computed block item index >= blockItems");
1177 if (dir == read_direction::backward && blockItemIndex == 0 && blockNumber > 0) {
1178 if (use_compression()) {
1180 read_previous_block(l, blockNumber - 1);
1183 read_next_block(l, blockNumber - 1);
1184 m_nextItem = m_bufferEnd;
1187 if (use_compression()) {
1190 read_next_block(l, blockNumber);
1191 m_nextItem = m_bufferBegin + blockItemIndex;
1195 }
else if (m_seekState == seek_state::end) {
1200 m_nextItem = m_bufferBegin;
1201 if (use_compression()) {
1212 throw exception(
"Attempted seek to end when no blocks have been written");
1214 memory_size_type blockItemIndex =
1216 if (use_compression()) {
1220 m_nextItem = m_bufferBegin + blockItemIndex;
1224 log_debug() <<
"Unknown seek state " << m_seekState << std::endl;
1225 tp_assert(
false,
"perform_seek: Unknown seek state");
1228 m_seekState = seek_state::none;
1230 closeOnFail.commit();
1237 void get_buffer(compressor_thread_lock & l, stream_size_type blockNumber) {
1241 while (
m_buffer->is_busy()) compressor().wait_for_request_done(l);
1242 m_bufferBegin =
reinterpret_cast<T *
>(
m_buffer->get());
1243 m_bufferEnd = m_bufferBegin + block_items();
1255 virtual void flush_block(compressor_thread_lock & lock)
override {
1257 stream_size_type blockNumber = buffer_block_number();
1258 stream_size_type writeOffset;
1259 if (!use_compression()) {
1266 writeOffset = std::numeric_limits<stream_size_type>::max();
1272 throw exception(
"flush_block: blockNumber not at end of stream");
1277 m_currentFileSize = std::numeric_limits<stream_size_type>::max();
1279 if (m_nextItem == NULL)
throw exception(
"m_nextItem is NULL");
1280 if (m_bufferBegin == NULL)
throw exception(
"m_bufferBegin is NULL");
1282 if (blockItems + blockNumber *
m_blockItems > size()) {
1284 static_cast<memory_size_type
>(size() - blockNumber *
m_blockItems);
1286 m_buffer->set_size(blockItems *
sizeof(T));
1288 compressor_request r;
1296 compressor().request(r);
1300 m_lastWriteBlockNumber = blockNumber;
1308 void maybe_update_read_offset(compressor_thread_lock & lock) {
1313 if (
m_response.has_block_info(m_lastWriteBlockNumber)) {
1326 void read_next_block(compressor_thread_lock & lock, stream_size_type blockNumber) {
1328 get_buffer(lock, blockNumber);
1330 maybe_update_read_offset(lock);
1332 stream_size_type readOffset;
1335 if (use_compression()) {
1337 "read_next_block: Buffer has wrong read offset");
1341 if (use_compression()) {
1342 readOffset = m_nextReadOffset;
1344 stream_size_type itemOffset = blockNumber *
m_blockItems;
1346 memory_size_type blockSize =
1347 std::min(m_blockSize,
1348 static_cast<memory_size_type>((size() - itemOffset) *
m_itemSize));
1352 read_block(lock, readOffset, read_direction::forward);
1354 if (size() - blockNumber * m_blockItems < blockItems) {
1355 blockItems =
static_cast<size_t>(size() - blockNumber *
m_blockItems);
1357 size_t usableBlockSize =
m_buffer->size() /
sizeof(T) *
sizeof(T);
1358 size_t expectedBlockSize = blockItems *
sizeof(T);
1359 if (usableBlockSize != expectedBlockSize) {
1360 log_error() <<
"Expected " << expectedBlockSize <<
" (" << blockItems
1361 <<
" items), got " <<
m_buffer->size() <<
" (rounded to "
1362 << usableBlockSize <<
')' << std::endl;
1363 throw exception(
"read_next_block: Bad buffer->get_size");
1367 if (use_compression()) {
1369 m_nextReadOffset =
m_response.next_read_offset();
1371 throw exception(
"read_next_block: bad get_read_offset");
1373 throw exception(
"read_next_block: bad get_block_size");
1381 m_nextItem = m_bufferBegin;
1384 void read_previous_block(compressor_thread_lock & lock, stream_size_type blockNumber) {
1386 tp_assert(use_compression(),
"read_previous_block: !use_compression");
1387 get_buffer(lock, blockNumber);
1389 maybe_update_read_offset(lock);
1395 read_block(lock,
m_readOffset, read_direction::backward);
1403 throw exception(
"Bad buffer get_read_offset");
1405 throw exception(
"Bad buffer get_block_size");
1408 m_nextItem = m_bufferEnd;
1411 void read_block(compressor_thread_lock & lock,
1412 stream_size_type readOffset,
1413 read_direction::type readDirection)
1415 compressor_request r;
1423 compressor().request(r);
1429 stream_size_type block_number(stream_size_type offset) {
1440 stream_size_type block_number() {
1450 stream_size_type buffer_block_number() {
1451 stream_size_type blockNumber = block_number();
1452 if (m_nextItem == m_bufferEnd)
1453 return blockNumber - 1;
1458 memory_size_type block_item_index(stream_size_type offset) {
1460 memory_size_type cast =
static_cast<memory_size_type
>(i);
1461 tp_assert(!(i != cast),
"Block item index out of bounds");
1465 memory_size_type block_item_index() {
1473 void cache_read_writes() {
1474 if (
m_buffer.get() == 0 || m_seekState != seek_state::none) {
1477 }
else if (offset() == size()) {
1485 static_cast<memory_size_type
>(size() - offset());
1502 #endif // TPIE_COMPRESSED_STREAM_H
Defines the tp_assert macro.
Sequential access is intended.
void set_position(const stream_position &pos)
Seek to a position that was previously recalled with get_position.
stream_position m_nextPosition
If seekState is position, seek to this position before reading/writing.
Open a file for reading only.
memory_size_type m_cachedWrites
Number of cheap, unchecked writes we can do next.
stream_position get_position()
Store the current stream position such that it may be found later on.
bool m_bufferDirty
Whether the current block must be written out to disk before being ejected.
stream_size_type m_readOffset
Position relating to the currently loaded buffer.
The buffer is different from the contents on the disk.
stream_size_type m_size
Number of logical items in the stream.
void open(const std::string &path, compression_flags compressionFlags)
Deprecated interface for opening a named stream.
void open(temp_file &file, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a temporary stream.
Random access is intended.
bool can_read()
Check if the next call to read() will succeed or not.
No written blocks should be compressed.
void truncate(stream_size_type offset)
Truncate to given size.
std::string describe()
For debugging: Describe the internal stream state in a string.
Central file abstraction.
const T & read()
Reads next item from stream if can_read() == true.
Base class containing the implementation details that are independent of the item type...
offset_type
Type describing how we should interpret the offset supplied to seek.
buffer_t m_buffer
Buffer holding the items of the block currently being read/written.
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
void uncache_read_writes()
Reset cheap read/write counts to zero so that the next read/write operation will check stream state p...
Buffer manager for a single stream.
Compress some blocks according to available resources (time, memory).
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
file_accessor::byte_stream_accessor< default_raw_file_accessor > m_byteStreamAccessor
File accessor.
temp_file * m_tempFile
The temporary file we have opened (when appropriate).
Generic internal array with known memory requirements.
POD object indicating the position of an item in a stream.
Interface to the compressor thread.
void read(IT const a, IT const b)
Precondition: is_open().
memory_size_type m_itemSize
Size of a single item.
bool can_read_back()
Check if the next call to read_back() will succeed or not.
void open(memory_size_type userDataSize, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening an unnamed temporary stream.
stream_size_type offset() const
The stream offset of the item pointed to.
compressor_response m_response
Response from compressor thread; protected by compressor thread mutex.
stream_size_type m_lastBlockReadOffset
When use_compression() is true: Read offset of the last block in the stream.
void open(temp_file &file, compression_flags compressionFlags)
Deprecated interface for opening a temporary stream.
The buffer will soon change to reflect the contents on the disk.
void open(compression_flags compressionFlags)
Deprecated interface for opening an unnamed temporary stream.
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
void truncate(const stream_position &pos)
Truncate to given stream position.
Compress all blocks according to the preferred compression scheme which can be set using tpie::the_co...
Stream position indicator.
memory_size_type m_blockItems
Number of items in a logical block.
Class representing a reference to a temporary file.
stream_buffers m_buffers
Buffer manager for this entire stream.
Implementation helper that closes the stream if a method exits by throwing an exception.
Buffers for compressed streams.
void describe(std::ostream &out)
For debugging: Describe the internal stream state in a string.
Random access is intended.
Open a file for writing only, content is truncated.
logstream & log_debug()
Return logstream for writing debug log messages.
The buffer is equal to the contents on the disk.
memory_size_type m_blockSize
Size (in bytes) of a logical (uncompressed) block.
static stream_position end()
Special-value constructor returning a pointer to the end.
Read/write direction enumeration.
bool m_canRead
Whether we are open for reading.
std::unique_ptr< T, tpie_deleter > unique_ptr
like std::unique_ptr, but delete the object with tpie_delete.
compression_flags
Possible values for the compressionFlags parameter to stream::open.
const T & peek()
Peeks next item from stream if can_read() == true.
Compress some blocks according to available resources (time, memory).
Item type-agnostic file_stream operations.
Neither sequential access nor random access is intended.
tpie::unique_ptr< temp_file > m_ownedTempFile
The anonymous temporary file we have opened (when appropriate).
bool m_canWrite
Whether we are open for writing.
stream_size_type current_file_size(compressor_thread_lock &l)
Blocks to take the compressor lock.
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Compressor thread requests and responses.
access_type
Type describing how we wish to access a file.
bool m_open
Whether we are open.
#define tp_assert(condition, message)
Neither sequential access nor random access is intended.
Open a file for writing only.
logstream & log_error()
Return logstream for writing error log messages.
stream_size_type last_block_read_offset(compressor_thread_lock &l)
Blocks to take the compressor lock.
bool m_updateReadOffsetFromWrite
When use_compression() is true: Indicates whether m_response is the response to a write request...
Response to an I/O request.
The buffer will soon be written to disk.
stream_size_type m_streamBlocks
The number of blocks written to the file.
Compress all blocks according to the preferred compression scheme which can be set using tpie::the_co...
Open a file for reading or writing.