TPIE

2362a60
stream.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef TPIE_COMPRESSED_STREAM_H
21 #define TPIE_COMPRESSED_STREAM_H
22 
26 
27 #include <tpie/array.h>
28 #include <tpie/tpie_assert.h>
29 #include <tpie/tempname.h>
30 #include <tpie/file_base_crtp.h>
31 #include <tpie/file_stream_base.h>
32 #include <tpie/file_accessor/byte_stream_accessor.h>
33 #include <tpie/compressed/thread.h>
34 #include <tpie/compressed/buffer.h>
38 #include <tpie/stream_writable.h>
39 
40 namespace tpie {
41 
42 struct open {
43  enum type {
45  read_only = 00000001,
48  write_only = 00000002,
51  access_normal = 00000004,
54  access_random = 00000010,
57  compression_normal = 00000020,
61  compression_all = 00000040,
62 
63  defaults = 0
64  };
65 
66  friend inline open::type operator|(open::type a, open::type b)
67  { return (open::type) ((int) a | (int) b); }
68  friend inline open::type operator&(open::type a, open::type b)
69  { return (open::type) ((int) a & (int) b); }
70  friend inline open::type operator^(open::type a, open::type b)
71  { return (open::type) ((int) a ^ (int) b); }
72  friend inline open::type operator~(open::type a)
73  { return (open::type) ~(int) a; }
74 
75  static type translate(access_type accessType, cache_hint cacheHint, compression_flags compressionFlags) {
76  return (type) ((
77 
78  (accessType == access_read) ? read_only :
79  (accessType == access_write) ? write_only :
80  defaults) | (
81 
82  (cacheHint == tpie::access_normal) ? access_normal :
83  (cacheHint == tpie::access_random) ? access_random :
84  defaults) | (
85 
86  (compressionFlags == tpie::compression_normal) ? compression_normal :
87  (compressionFlags == tpie::compression_all) ? compression_all :
88  defaults));
89  }
90 
91  static cache_hint translate_cache(open::type openFlags) {
92  const open::type cacheFlags =
94 
95  if (cacheFlags == open::access_normal)
96  return tpie::access_normal;
97  else if (cacheFlags == open::access_random)
98  return tpie::access_random;
99  else if (!cacheFlags)
101  else
102  throw tpie::stream_exception("Invalid cache flags supplied");
103  }
104 
105  static compression_flags translate_compression(open::type openFlags) {
106  const open::type compressionFlags =
108 
109  if (compressionFlags == open::compression_normal)
111  else if (compressionFlags == open::compression_all)
112  return tpie::compression_all;
113  else if (!compressionFlags)
114  return tpie::compression_none;
115  else
116  throw tpie::stream_exception("Invalid compression flags supplied");
117  }
118 };
119 
120 
126 public:
127  typedef std::shared_ptr<compressor_buffer> buffer_t;
128 
129 protected:
130  struct seek_state {
131  enum type {
132  none,
133  beginning,
134  end,
135  position
136  };
137  };
138 
139  compressed_stream_base(memory_size_type itemSize,
140  double blockFactor);
141 
142  // Non-virtual, protected destructor
144 
145  virtual void flush_block(compressor_thread_lock &) = 0;
146 
147  virtual void post_open() = 0;
148 
149  void open_inner(const std::string & path,
150  open::type openFlags,
151  memory_size_type userDataSize);
152 
153  compressor_thread & compressor() { return the_compressor_thread(); }
154 
155 public:
156  bool is_readable() const throw() { return m_canRead; }
157 
158  bool is_writable() const throw() { return m_canWrite; }
159 
160  static memory_size_type block_size(double blockFactor) throw ();
161 
162  static double calculate_block_factor(memory_size_type blockSize) throw ();
163 
164  static memory_size_type block_memory_usage(double blockFactor);
165 
166  memory_size_type block_items() const;
167 
168  memory_size_type block_size() const;
169 
170  template <typename TT>
171  void read_user_data(TT & data) {
172  if (sizeof(TT) != user_data_size())
173  throw stream_exception("Wrong user data size");
174  read_user_data(reinterpret_cast<void *>(&data), sizeof(TT));
175  }
176 
177  memory_size_type read_user_data(void * data, memory_size_type count);
178 
179  template <typename TT>
180  void write_user_data(const TT & data) {
181  if (sizeof(TT) > max_user_data_size())
182  throw stream_exception("Wrong user data size");
183  write_user_data(reinterpret_cast<const void *>(&data), sizeof(TT));
184  }
185 
186  void write_user_data(const void * data, memory_size_type count);
187 
188  memory_size_type user_data_size() const;
189 
190  memory_size_type max_user_data_size() const;
191 
192  const std::string & path() const;
193 
204  void open(const std::string & path,
205  access_type accessType,
206  memory_size_type userDataSize = 0,
207  cache_hint cacheHint=access_sequential,
208  compression_flags compressionFlags=compression_none)
209  {
210  open(path, open::translate(accessType, cacheHint, compressionFlags), userDataSize);
211  }
212 
216  void open(memory_size_type userDataSize,
217  cache_hint cacheHint=access_sequential,
218  compression_flags compressionFlags=compression_none) {
219  open(open::translate(access_read_write, cacheHint, compressionFlags), userDataSize);
220  }
221 
226  access_type accessType,
227  memory_size_type userDataSize = 0,
228  cache_hint cacheHint=access_sequential,
229  compression_flags compressionFlags=compression_none) {
230  open(file, open::translate(accessType, cacheHint, compressionFlags), userDataSize);
231  }
232 
236  void open(const std::string & path, compression_flags compressionFlags) {
237  const memory_size_type userDataSize = 0;
238  open(path, open::translate(access_read_write, access_sequential, compressionFlags), userDataSize);
239  }
240 
244  void open(compression_flags compressionFlags) {
245  const memory_size_type userDataSize = 0;
246  open(open::translate(access_read_write, access_sequential, compressionFlags), userDataSize);
247  }
248 
252  void open(temp_file & file, compression_flags compressionFlags) {
253  const memory_size_type userDataSize = 0;
254  open(file, open::translate(access_read_write, access_sequential, compressionFlags), userDataSize);
255  }
256 
297  void open(const std::string & path, open::type openFlags=open::defaults, memory_size_type userDataSize=0);
298 
305  void open(open::type openFlags=open::defaults, memory_size_type userDataSize=0);
306 
314  void open(temp_file & file, open::type openFlags=open::defaults, memory_size_type userDataSize=0);
315 
316  void close();
317 
318 protected:
319  void finish_requests(compressor_thread_lock & l);
320 
328  stream_size_type last_block_read_offset(compressor_thread_lock & l);
329 
337  stream_size_type current_file_size(compressor_thread_lock & l);
338 
339  bool use_compression() { return m_byteStreamAccessor.get_compressed(); }
340 
347  }
348 
349 public:
350  bool is_open() const { return m_open; }
351 
352  stream_size_type size() const { return m_size; }
353 
354  stream_size_type file_size() const { return size(); }
355 
356  stream_size_type offset() const {
357  switch (m_seekState) {
358  case seek_state::none:
359  return m_offset;
360  case seek_state::beginning:
361  return 0;
362  case seek_state::end:
363  return size();
364  case seek_state::position:
365  return m_nextPosition.offset();
366  }
367  tp_assert(false, "offset: Unreachable statement; m_seekState invalid");
368  return 0; // suppress compiler warning
369  }
370 
371 protected:
380  memory_size_type m_blockItems;
382  memory_size_type m_blockSize;
384  bool m_canRead;
388  bool m_open;
390  memory_size_type m_itemSize;
392  memory_size_type m_cachedReads;
394  memory_size_type m_cachedWrites;
403  stream_size_type m_size;
407  buffer_t m_buffer;
408 
411  stream_size_type m_streamBlocks;
412 
416  stream_size_type m_lastBlockReadOffset;
417  stream_size_type m_currentFileSize;
418 
421 
427  stream_size_type m_lastWriteBlockNumber;
428 
429  seek_state::type m_seekState;
430 
438  stream_size_type m_readOffset;
439 
453  stream_size_type m_offset;
454 
457 
458  stream_size_type m_nextReadOffset;
459 };
460 
471 public:
473  : m_committed(false)
474  , m_stream(s)
475  {
476  }
477 
479  if (!m_committed) m_stream->close();
480  }
481 
482  void commit() {
483  m_committed = true;
484  }
485 
486 private:
487  bool m_committed;
488  compressed_stream_base * m_stream;
489 };
490 
512 template <typename T>
513 class file_stream : public compressed_stream_base {
514  static_assert(is_stream_writable<T>::value, "file_stream item type must be trivially copyable");
515 
517 
518 public:
519  static const file_stream_base::offset_type beginning = file_stream_base::beginning;
520  static const file_stream_base::offset_type end = file_stream_base::end;
521  static const file_stream_base::offset_type current = file_stream_base::current;
522 
523  typedef T item_type;
524  typedef file_stream_base::offset_type offset_type;
525 
526  file_stream(double blockFactor=1.0)
527  : compressed_stream_base(sizeof(T), blockFactor)
528  , m_bufferBegin(0)
529  , m_bufferEnd(0)
530  , m_nextItem(0)
531  {
532  }
533 
534  // This destructor would not need to be virtual if we could use
535  // final (non-subclassable) classes, but that is a C++11 feature.
536  virtual ~file_stream() {
537  try {
538  close();
539  } catch (std::exception & e) {
540  log_error() << "Someone threw an error in file_stream::~file_stream: " << e.what() << std::endl;
541  abort();
542  }
543  }
544 
545  static memory_size_type memory_usage(double blockFactor=1.0) {
546  // m_buffer is included in m_buffers memory usage
547  return sizeof(file_stream)
548  + sizeof(temp_file) // m_ownedTempFile
549  + stream_buffers::memory_usage(block_size(blockFactor)) // m_buffers
550  ;
551  }
552 
556  void describe(std::ostream & out) {
557  if (!this->is_open()) {
558  out << "[Closed stream]";
559  return;
560  }
561 
562  out << "[(" << m_byteStreamAccessor.path() << ") item " << offset()
563  << " of " << size();
564  out << " (block " << block_number()
565  << " @ byte " << m_readOffset
566  << ", item " << block_item_index()
567  << ")";
568 
569  if (use_compression()) {
570  out << ", compressed";
571  } else {
572  out << ", uncompressed";
573  }
574 
575  switch (m_seekState) {
576  case seek_state::none:
577  break;
578  case seek_state::beginning:
579  out << ", seeking to beginning";
580  break;
581  case seek_state::end:
582  out << ", seeking to end";
583  break;
584  case seek_state::position:
585  out << ", seeking to position " << m_nextPosition.offset();
586  out << " (block " << block_number(m_nextPosition.offset())
587  << " @ byte " << m_nextPosition.read_offset()
588  << ", item " << block_item_index(m_nextPosition.offset())
589  << ")";
590  break;
591  }
592 
593  if (m_bufferDirty)
594  out << " dirty";
595 
596  if (m_seekState == seek_state::none) {
597  if (can_read()) out << ", can read";
598  else out << ", cannot read";
599  }
600 
601  out << ", " << m_streamBlocks << " blocks";
602  if (m_lastBlockReadOffset != std::numeric_limits<stream_size_type>::max())
603  out << ", last block at " << m_lastBlockReadOffset;
604  if (m_currentFileSize != std::numeric_limits<stream_size_type>::max())
605  out << ", current file size " << m_currentFileSize;
606 
607  out << ']';
608  }
609 
613  std::string describe() {
614  std::stringstream ss;
615  describe(ss);
616  return ss.str();
617  }
618 
619  virtual void post_open() override {
620  seek(0);
621  }
622 
627  void seek(stream_offset_type offset, offset_type whence=beginning) {
628  tp_assert(is_open(), "seek: !is_open");
631  if (!use_compression()) {
632  // Handle uncompressed case by delegating to set_position.
633  switch (whence) {
634  case beginning:
635  break;
636  case end:
637  offset += size();
638  break;
639  case current:
640  offset += this->offset();
641  break;
642  }
643  set_position(stream_position(0, offset));
644  return;
645  }
646  // Otherwise, we are in a compressed stream.
647  if (offset != 0) throw stream_exception("Random seeks are not supported");
648  switch (whence) {
649  case beginning:
650  if (m_buffer.get() != 0 && buffer_block_number() == 0) {
651  // We are already reading or writing the first block.
652  m_nextItem = m_bufferBegin;
653  m_offset = m_readOffset = 0;
654  m_seekState = seek_state::none;
655  } else {
656  // We need to load the first block on the next I/O.
657  m_seekState = seek_state::beginning;
658  }
659  return;
660  case end:
661  if (m_buffer.get() == 0) {
662  m_seekState = seek_state::end;
663  } else if (m_offset == size()) {
664  // no-op
665  m_seekState = seek_state::none;
666  } else if (// We are in the last block, and it has NOT YET been written to disk, or
667  buffer_block_number() == m_streamBlocks ||
668  // we are in the last block, and it has ALREADY been written to disk.
669  buffer_block_number()+1 == m_streamBlocks)
670  {
671  // If the last block is full,
672  // block_item_index() reports 0 when it should report m_blockItems.
673  // Compute blockItemIndex manually to handle this edge case.
674  stream_size_type blockItemIndex =
675  size() - buffer_block_number() * m_blockItems;
676  memory_size_type cast = static_cast<memory_size_type>(blockItemIndex);
677  tp_assert(blockItemIndex == cast, "seek: blockItemIndex out of bounds");
678  m_nextItem = m_bufferBegin + cast;
679 
680  m_offset = size();
681  m_seekState = seek_state::none;
682  } else {
683  m_seekState = seek_state::end;
684  }
685  return;
686  case current:
687  return;
688  }
689  tp_assert(false, "seek: Unknown whence");
690  }
691 
698  void truncate(stream_size_type offset) {
699  tp_assert(is_open(), "truncate: !is_open");
701  if (offset == size())
702  return;
703  else if (offset == 0)
704  truncate_zero();
705  else if (!use_compression())
706  truncate_uncompressed(offset);
707  else
708  throw stream_exception("Arbitrary truncate is not supported");
709 
710  if (m_tempFile) m_tempFile->update_recorded_size(m_size);
711  }
712 
716  void truncate(const stream_position & pos) {
717  tp_assert(is_open(), "truncate: !is_open");
719  if (pos.offset() == size())
720  return;
721  else if (pos.offset() == 0)
722  truncate_zero();
723  else if (!use_compression())
724  truncate_uncompressed(pos.offset());
725  else
726  truncate_compressed(pos);
727 
728  if (m_tempFile) m_tempFile->update_recorded_size(m_size);
729  }
730 
731 private:
735  void truncate_zero() {
736  // No need to flush block
737  m_buffer.reset();
738  m_response.clear_block_info();
740  compressor_thread_lock l(compressor());
741  finish_requests(l);
742  get_buffer(l, 0);
743  m_size = 0;
744  m_streamBlocks = 0;
745  m_byteStreamAccessor.truncate(0);
746 
747  m_readOffset = 0;
748  m_offset = 0;
749  m_nextItem = m_bufferBegin;
750  m_seekState = seek_state::none;
752  }
753 
754  void truncate_uncompressed(stream_size_type offset) {
755  tp_assert(!use_compression(), "truncate_uncompressed called on compressed stream");
756 
757  stream_size_type currentOffset = this->offset();
758  if (m_buffer.get() != 0
759  && block_number(offset) == buffer_block_number()
760  && buffer_block_number() == m_streamBlocks)
761  {
762  // We are truncating a final block that has not been written yet.
763  m_size = offset;
764  if (offset < m_offset) {
765  m_offset = offset;
766  memory_size_type blockItemIndex =
767  static_cast<memory_size_type>(offset - m_streamBlocks * m_blockItems);
768  m_nextItem = m_bufferBegin + blockItemIndex;
769  }
770  m_bufferDirty = true;
771  m_seekState = seek_state::none;
772  // No need to update m_streamBlocks
773  } else {
774  // We need to do a truncate on the file accessor.
775  // Get rid of the current block first.
776  compressor_thread_lock l(compressor());
777  if (offset < buffer_block_number() * m_blockItems) {
778  // No need to flush current block, since we are truncating it away.
779  } else {
780  // Changes to the current block may still be visible after the truncate.
781  if (m_bufferDirty) {
783  flush_block(l);
784  }
785  }
786  m_buffer.reset();
787  m_bufferDirty = false;
788  finish_requests(l);
789  m_byteStreamAccessor.truncate(offset);
790  m_size = offset;
791  m_streamBlocks = (offset + m_blockItems - 1) / m_blockItems;
792  }
793  seek(std::min(currentOffset, offset));
794  }
795 
796  void truncate_compressed(const stream_position & pos) {
797  tp_assert(use_compression(), "truncate_compressed called on uncompressed stream");
798 
799  stream_size_type offset = pos.offset();
800  stream_position finalDestination = (offset < this->offset()) ? pos : get_position();
801 
802  if (m_buffer.get() == 0 || block_number(offset) != buffer_block_number()) {
803  set_position(pos);
804  perform_seek();
805  }
806 
807  // We are truncating into the currently loaded block.
808  if (buffer_block_number() < m_streamBlocks) {
809  m_streamBlocks = buffer_block_number() + 1;
810  m_lastBlockReadOffset = pos.read_offset();
811  m_currentFileSize = std::numeric_limits<stream_size_type>::max();
812  compressor_thread_lock l(compressor());
813  m_response.clear_block_info();
815  }
816  m_size = offset;
817  if (offset < m_offset) {
818  m_offset = offset;
819  memory_size_type blockItemIndex =
820  static_cast<memory_size_type>(offset - m_streamBlocks * m_blockItems);
821  m_nextItem = m_bufferBegin + blockItemIndex;
822  }
823  m_bufferDirty = true;
824 
825  set_position(finalDestination);
826  }
827 
828 public:
842  tp_assert(is_open(), "get_position: !is_open");
843  if (!use_compression()) return stream_position(0, offset());
844  switch (m_seekState) {
845  case seek_state::position:
846  // We just set_position, so we can just return what we got.
847  return m_nextPosition;
848  case seek_state::beginning:
849  return stream_position(0, 0);
850  case seek_state::none:
851  if (buffer_block_number() != m_streamBlocks) {
852  if (m_nextItem == m_bufferEnd)
853  return stream_position(m_nextReadOffset, m_offset);
854  else
856  }
857  // We are in a new block at the end of the stream.
858  if (m_nextItem == m_bufferEnd) {
859  tp_assert(m_bufferDirty, "At end of buffer, but bufferDirty is false?");
860  // Make sure the position we get is not at the end of a block
861  compressor_thread_lock lock(compressor());
863  flush_block(lock);
864  get_buffer(lock, m_streamBlocks);
865  m_nextItem = m_bufferBegin;
866  }
867  break;
868  case seek_state::end:
869  // Figure out the size of the file below.
870  break;
871  }
872 
873  stream_size_type readOffset;
874  stream_size_type blockNumber = block_number(offset());
875  compressor_thread_lock l(compressor());
876  if (size() % m_blockItems == 0)
877  readOffset = current_file_size(l);
878  else if (blockNumber == m_streamBlocks)
879  readOffset = current_file_size(l);
880  else if (blockNumber == m_streamBlocks - 1)
881  readOffset = last_block_read_offset(l);
882  else {
883  tp_assert(false, "get_position: Invalid block_number");
884  readOffset = 1111111111111111111ull; // avoid compiler warning
885  }
886  return stream_position(readOffset, offset());
887  }
888 
893  void set_position(const stream_position & pos) {
895 
896  // If the code is correct, short circuiting is not necessary;
897  // if the code is not correct, short circuiting might mask faults.
898  /*
899  if (pos == m_position) {
900  m_seekState = seek_state::none;
901  return;
902  }
903  */
904 
905  if (pos == stream_position::end()) {
906  seek(0, end);
907  return;
908  }
909 
910  if (!use_compression() && pos.read_offset() != 0)
911  throw stream_exception("set_position: Invalid position, read_offset != 0");
912 
913  if (pos.offset() > size())
914  throw stream_exception("set_position: Invalid position, offset > size");
915 
916  if (m_buffer.get() != 0
917  && block_number(pos.offset()) == buffer_block_number())
918  {
919  if (pos.read_offset() != m_readOffset) {
920  // We don't always know the read offset of the current block
921  // in m_readOffset, so let's assume that
922  // pos.read_offset() is correct.
923  }
924 
925  m_readOffset = pos.read_offset();
926  m_offset = pos.offset();
927  m_nextItem = m_bufferBegin + block_item_index();
928  m_seekState = seek_state::none;
929  return;
930  }
931 
932  m_nextPosition = pos;
933  m_seekState = seek_state::position;
935  }
936 
947  const T & read() {
948  if (m_cachedReads > 0) {
949  --m_cachedReads;
950  ++m_offset;
951  return *m_nextItem++;
952  }
953  const T & res = peek();
954  ++m_offset;
955  ++m_nextItem;
956  cache_read_writes();
957  return res;
958  }
959 
970  const T & peek() {
971  if (m_cachedReads > 0) {
972  return *m_nextItem;
973  }
974  if (m_seekState != seek_state::none) perform_seek();
975  if (m_offset == m_size) throw end_of_stream_exception();
976  if (m_nextItem == m_bufferEnd) {
977  compressor_thread_lock l(compressor());
978  if (this->m_bufferDirty) {
980  flush_block(l);
981  }
982  // At this point, block_number() == buffer_block_number() + 1
983  read_next_block(l, block_number());
984  }
985  return *m_nextItem;
986  }
987 
988  void skip() {
989  read();
990  }
991 
992  void skip_back() {
993  read_back();
994  }
995 
1002  template <typename IT>
1003  void read(IT const a, IT const b) {
1004  for (IT i = a; i != b; ++i) *i = read();
1005  }
1006 
1010  bool can_read() {
1011  if (m_cachedReads > 0)
1012  return true;
1013 
1014  if (!this->m_open)
1015  return false;
1016 
1017  return offset() < size();
1018  }
1019 
1023  bool can_read_back() {
1024  if (!this->m_open)
1025  return false;
1026 
1027  return offset() > 0;
1028  }
1029 
1030  const T & read_back() {
1031  if (m_seekState != seek_state::none) {
1032  if (offset() == 0) throw end_of_stream_exception();
1033  perform_seek(read_direction::backward);
1034  }
1035  if (m_nextItem == m_bufferBegin) {
1036  if (m_offset == 0) throw end_of_stream_exception();
1038  compressor_thread_lock l(compressor());
1039  if (this->m_bufferDirty) {
1041  flush_block(l);
1042  }
1043  if (use_compression()) {
1044  read_previous_block(l, block_number() - 1);
1045  } else {
1046  read_next_block(l, block_number() - 1);
1047  m_nextItem = m_bufferEnd;
1048  }
1049  }
1050  ++m_cachedReads;
1051  --m_offset;
1052  return *--m_nextItem;
1053  }
1054 
1055  void write(const T & item) {
1056  if (m_cachedWrites > 0) {
1057  *m_nextItem++ = item;
1058  ++m_size;
1059  ++m_offset;
1060  --m_cachedWrites;
1061  return;
1062  }
1063 
1064  if (m_seekState != seek_state::none) perform_seek();
1065 
1066  if (!use_compression()) {
1067  if (m_nextItem == m_bufferEnd) {
1068  compressor_thread_lock lock(compressor());
1069  if (m_bufferDirty) {
1071  flush_block(lock);
1072  }
1073  if (offset() == size()) {
1074  get_buffer(lock, m_streamBlocks);
1075  m_nextItem = m_bufferBegin;
1076  } else {
1077  read_next_block(lock, block_number());
1078  }
1079  }
1080  if (offset() == m_size) ++m_size;
1081  *m_nextItem++ = item;
1082  this->m_bufferDirty = true;
1083  ++m_offset;
1084  cache_read_writes();
1085  return;
1086  }
1087 
1088  if (m_offset != size())
1089  throw stream_exception("Non-appending write attempted");
1090 
1091  if (m_nextItem == m_bufferEnd) {
1092  compressor_thread_lock l(compressor());
1093  if (m_bufferDirty) {
1095  flush_block(l);
1096  }
1097  get_buffer(l, m_streamBlocks);
1098  m_nextItem = m_bufferBegin;
1099  }
1100 
1101  *m_nextItem++ = item;
1102  this->m_bufferDirty = true;
1103  ++m_size;
1104  ++m_offset;
1105 
1106  cache_read_writes();
1107  }
1108 
1109  template <typename IT>
1110  void write(IT const a, IT const b) {
1111  for (IT i = a; i != b; ++i) write(*i);
1112  }
1113 
1114 private:
1124  void perform_seek(read_direction::type dir=read_direction::forward) {
1125  if (!is_open()) throw stream_exception("Stream is not open");
1126  // This must be initialized before the compressor lock below,
1127  // so that it is destructed after we free the lock.
1128  close_on_fail_guard closeOnFail(this);
1129 
1130  tp_assert(!(m_seekState == seek_state::none), "perform_seek when seekState is none");
1131 
1133 
1134  compressor_thread_lock l(compressor());
1135 
1137 
1138  if (this->m_bufferDirty)
1139  flush_block(l);
1140 
1141  m_buffer.reset();
1142  finish_requests(l);
1143 
1144  // Ensure that seek state beginning will take us to a read-only state
1145  if (m_seekState == seek_state::beginning && size() == 0) {
1146  m_seekState = seek_state::end;
1147  }
1148 
1149  // Ensure that seek state position will take us to a read-only state
1150  if (m_seekState == seek_state::position
1151  && m_nextPosition.offset() == size())
1152  {
1153  m_seekState = seek_state::end;
1154  }
1155 
1156  if (m_seekState == seek_state::beginning) {
1157  // The (seek beginning && size() == 0) case is handled
1158  // by changing seekState to end.
1159  // Thus, we know for sure that size() != 0, and so the
1160  // read_next_block will not yield an end_of_stream_exception.
1161  tp_assert(!(size() == 0), "Seek beginning when size is zero");
1162  if (use_compression()) {
1163  m_nextReadOffset = 0;
1164  }
1165  read_next_block(l, 0);
1166  m_offset = 0;
1167  tp_assert(m_readOffset == 0, "perform_seek: Bad readOffset after reading first block");
1168  } else if (m_seekState == seek_state::position) {
1169  stream_size_type blockNumber = block_number(m_nextPosition.offset());
1170  memory_size_type blockItemIndex = block_item_index(m_nextPosition.offset());
1171 
1172  // This cannot happen in practice due to the implementation of
1173  // block_number and block_item_index, but it is an important
1174  // assumption in the following code.
1175  tp_assert(!(blockItemIndex >= m_blockItems), "perform_seek: Computed block item index >= blockItems");
1176 
1177  if (dir == read_direction::backward && blockItemIndex == 0 && blockNumber > 0) {
1178  if (use_compression()) {
1179  m_readOffset = m_nextPosition.read_offset();
1180  read_previous_block(l, blockNumber - 1);
1181  // sets m_nextItem = m_bufferEnd
1182  } else {
1183  read_next_block(l, blockNumber - 1);
1184  m_nextItem = m_bufferEnd;
1185  }
1186  } else {
1187  if (use_compression()) {
1188  m_nextReadOffset = m_nextPosition.read_offset();
1189  }
1190  read_next_block(l, blockNumber);
1191  m_nextItem = m_bufferBegin + blockItemIndex;
1192  }
1193 
1195  } else if (m_seekState == seek_state::end) {
1196  if (m_streamBlocks * m_blockItems == size() && dir == read_direction::forward) {
1197  // The last block in the stream is full,
1198  // so we can safely start a new empty one.
1199  get_buffer(l, m_streamBlocks);
1200  m_nextItem = m_bufferBegin;
1201  if (use_compression()) {
1203  } else {
1204  m_readOffset = 0;
1205  }
1206  m_offset = size();
1207  } else {
1208  // The last block in the stream is non-full, or we are going to read_back.
1209  if (m_streamBlocks == 0) {
1210  // This cannot happen in practice,
1211  // since we short-circuit seek(end) when streamBlocks == 0.
1212  throw exception("Attempted seek to end when no blocks have been written");
1213  }
1214  memory_size_type blockItemIndex =
1215  static_cast<memory_size_type>(size() - (m_streamBlocks - 1) * m_blockItems);
1216  if (use_compression()) {
1217  m_nextReadOffset = last_block_read_offset(l);
1218  }
1219  read_next_block(l, m_streamBlocks - 1);
1220  m_nextItem = m_bufferBegin + blockItemIndex;
1221  m_offset = size();
1222  }
1223  } else {
1224  log_debug() << "Unknown seek state " << m_seekState << std::endl;
1225  tp_assert(false, "perform_seek: Unknown seek state");
1226  }
1227 
1228  m_seekState = seek_state::none;
1229 
1230  closeOnFail.commit();
1231  }
1232 
1237  void get_buffer(compressor_thread_lock & l, stream_size_type blockNumber) {
1239  buffer_t().swap(m_buffer);
1240  m_buffer = this->m_buffers.get_buffer(l, blockNumber);
1241  while (m_buffer->is_busy()) compressor().wait_for_request_done(l);
1242  m_bufferBegin = reinterpret_cast<T *>(m_buffer->get());
1243  m_bufferEnd = m_bufferBegin + block_items();
1244  this->m_bufferDirty = false;
1245  }
1246 
1255  virtual void flush_block(compressor_thread_lock & lock) override {
1257  stream_size_type blockNumber = buffer_block_number();
1258  stream_size_type writeOffset;
1259  if (!use_compression()) {
1260  // Uncompressed case
1261  writeOffset = blockNumber * m_blockSize;
1262  } else {
1263  // Compressed case
1264  if (blockNumber == m_streamBlocks) {
1265  // New block; no truncate
1266  writeOffset = std::numeric_limits<stream_size_type>::max();
1267  } else if (blockNumber == m_streamBlocks - 1) {
1268  // Block rewrite; truncate
1269  writeOffset = last_block_read_offset(lock);
1270  m_response.clear_block_info();
1271  } else {
1272  throw exception("flush_block: blockNumber not at end of stream");
1273  }
1274  }
1275 
1276  m_lastBlockReadOffset = std::numeric_limits<stream_size_type>::max();
1277  m_currentFileSize = std::numeric_limits<stream_size_type>::max();
1278 
1279  if (m_nextItem == NULL) throw exception("m_nextItem is NULL");
1280  if (m_bufferBegin == NULL) throw exception("m_bufferBegin is NULL");
1281  memory_size_type blockItems = m_blockItems;
1282  if (blockItems + blockNumber * m_blockItems > size()) {
1283  blockItems =
1284  static_cast<memory_size_type>(size() - blockNumber * m_blockItems);
1285  }
1286  m_buffer->set_size(blockItems * sizeof(T));
1288  compressor_request r;
1289  r.set_write_request(m_buffer,
1291  m_tempFile,
1292  writeOffset,
1293  blockItems,
1294  blockNumber,
1295  &m_response);
1296  compressor().request(r);
1297  m_bufferDirty = false;
1298 
1300  m_lastWriteBlockNumber = blockNumber;
1301  }
1302 
1303  if (blockNumber == m_streamBlocks) {
1304  ++m_streamBlocks;
1305  }
1306  }
1307 
1308  void maybe_update_read_offset(compressor_thread_lock & lock) {
1309  if (m_updateReadOffsetFromWrite && use_compression()) {
1310  while (!m_response.done()) {
1311  m_response.wait(lock);
1312  }
1313  if (m_response.has_block_info(m_lastWriteBlockNumber)) {
1314  m_readOffset = m_response.get_read_offset(m_lastWriteBlockNumber);
1315  m_nextReadOffset = m_readOffset + m_response.get_block_size(m_lastWriteBlockNumber);
1316  }
1318  }
1319  }
1320 
1326  void read_next_block(compressor_thread_lock & lock, stream_size_type blockNumber) {
1328  get_buffer(lock, blockNumber);
1329 
1330  maybe_update_read_offset(lock);
1331 
1332  stream_size_type readOffset;
1333  if (m_buffer->get_state() == compressor_buffer_state::clean) {
1334  m_readOffset = m_buffer->get_read_offset();
1335  if (use_compression()) {
1336  tp_assert(m_readOffset == m_nextReadOffset,
1337  "read_next_block: Buffer has wrong read offset");
1338  m_nextReadOffset = m_readOffset + m_buffer->get_block_size();
1339  }
1340  } else {
1341  if (use_compression()) {
1342  readOffset = m_nextReadOffset;
1343  } else {
1344  stream_size_type itemOffset = blockNumber * m_blockItems;
1345  readOffset = blockNumber * m_blockSize;
1346  memory_size_type blockSize =
1347  std::min(m_blockSize,
1348  static_cast<memory_size_type>((size() - itemOffset) * m_itemSize));
1349  m_buffer->set_size(blockSize);
1350  }
1351 
1352  read_block(lock, readOffset, read_direction::forward);
1353  size_t blockItems = m_blockItems;
1354  if (size() - blockNumber * m_blockItems < blockItems) {
1355  blockItems = static_cast<size_t>(size() - blockNumber * m_blockItems);
1356  }
1357  size_t usableBlockSize = m_buffer->size() / sizeof(T) * sizeof(T);
1358  size_t expectedBlockSize = blockItems * sizeof(T);
1359  if (usableBlockSize != expectedBlockSize) {
1360  log_error() << "Expected " << expectedBlockSize << " (" << blockItems
1361  << " items), got " << m_buffer->size() << " (rounded to "
1362  << usableBlockSize << ')' << std::endl;
1363  throw exception("read_next_block: Bad buffer->get_size");
1364  }
1365 
1366  // Update m_readOffset, m_nextReadOffset
1367  if (use_compression()) {
1368  m_readOffset = readOffset;
1369  m_nextReadOffset = m_response.next_read_offset();
1370  if (m_readOffset != m_buffer->get_read_offset())
1371  throw exception("read_next_block: bad get_read_offset");
1372  if (m_nextReadOffset != m_readOffset + m_buffer->get_block_size())
1373  throw exception("read_next_block: bad get_block_size");
1374  } else {
1375  // Uncompressed case. The following is a no-op:
1376  //m_readOffset = 0;
1377  // nextReadOffset is not used.
1378  }
1379  }
1380 
1381  m_nextItem = m_bufferBegin;
1382  }
1383 
1384  void read_previous_block(compressor_thread_lock & lock, stream_size_type blockNumber) {
1386  tp_assert(use_compression(), "read_previous_block: !use_compression");
1387  get_buffer(lock, blockNumber);
1388 
1389  maybe_update_read_offset(lock);
1390 
1391  if (m_buffer->get_state() == compressor_buffer_state::clean) {
1392  m_readOffset = m_buffer->get_read_offset();
1393  m_nextReadOffset = m_readOffset + m_buffer->get_block_size();
1394  } else {
1395  read_block(lock, m_readOffset, read_direction::backward);
1396 
1397  // This is backwards since we are reading backwards.
1398  // Confusing, I know.
1399  m_nextReadOffset = m_readOffset;
1400  m_readOffset = m_response.next_read_offset();
1401 
1402  if (m_readOffset != m_buffer->get_read_offset())
1403  throw exception("Bad buffer get_read_offset");
1404  if (m_nextReadOffset != m_readOffset + m_buffer->get_block_size())
1405  throw exception("Bad buffer get_block_size");
1406  }
1407 
1408  m_nextItem = m_bufferEnd;
1409  }
1410 
1411  void read_block(compressor_thread_lock & lock,
1412  stream_size_type readOffset,
1413  read_direction::type readDirection)
1414  {
1415  compressor_request r;
1416  r.set_read_request(m_buffer,
1418  readOffset,
1419  readDirection,
1420  &m_response);
1421  m_buffer->transition_state(compressor_buffer_state::dirty,
1423  compressor().request(r);
1424  while (!m_response.done()) {
1425  m_response.wait(lock);
1426  }
1427  }
1428 
1429  stream_size_type block_number(stream_size_type offset) {
1430  return offset / m_blockItems;
1431  }
1432 
1440  stream_size_type block_number() {
1441  return block_number(m_offset);
1442  }
1443 
1450  stream_size_type buffer_block_number() {
1451  stream_size_type blockNumber = block_number();
1452  if (m_nextItem == m_bufferEnd)
1453  return blockNumber - 1;
1454  else
1455  return blockNumber;
1456  }
1457 
1458  memory_size_type block_item_index(stream_size_type offset) {
1459  stream_size_type i = offset % m_blockItems;
1460  memory_size_type cast = static_cast<memory_size_type>(i);
1461  tp_assert(!(i != cast), "Block item index out of bounds");
1462  return cast;
1463  }
1464 
1465  memory_size_type block_item_index() {
1466  return block_item_index(m_offset);
1467  }
1468 
1473  void cache_read_writes() {
1474  if (m_buffer.get() == 0 || m_seekState != seek_state::none) {
1475  m_cachedWrites = 0;
1476  m_cachedReads = 0;
1477  } else if (offset() == size()) {
1478  m_cachedWrites = m_bufferDirty ? m_bufferEnd - m_nextItem : 0;
1479  m_cachedReads = 0;
1480  } else {
1481  m_cachedWrites = 0;
1482  m_cachedReads = m_bufferEnd - m_nextItem;
1483  if (offset() + m_cachedReads > size()) {
1484  m_cachedReads =
1485  static_cast<memory_size_type>(size() - offset());
1486  }
1487  }
1488  }
1489 
1490 private:
1492  T * m_bufferBegin;
1494  T * m_bufferEnd;
1495 
1497  T * m_nextItem;
1498 };
1499 
1500 } // namespace tpie
1501 
1502 #endif // TPIE_COMPRESSED_STREAM_H
Defines the tp_assert macro.
Sequential access is intended.
Definition: cache_hint.h:36
void set_position(const stream_position &pos)
Seek to a position that was previously recalled with get_position.
Definition: stream.h:893
stream_position m_nextPosition
If seekState is position, seek to this position before reading/writing.
Definition: stream.h:456
Open a file for reading only.
Definition: stream.h:45
memory_size_type m_cachedWrites
Number of cheap, unchecked writes we can do next.
Definition: stream.h:394
stream_position get_position()
Store the current stream position such that it may be found later on.
Definition: stream.h:841
bool m_bufferDirty
Whether the current block must be written out to disk before being ejected.
Definition: stream.h:378
stream_size_type m_readOffset
Position relating to the currently loaded buffer.
Definition: stream.h:438
The buffer is different from the contents on the disk.
Definition: buffer.h:69
stream_size_type m_size
Number of logical items in the stream.
Definition: stream.h:403
cache_hint
Definition: cache_hint.h:28
void open(const std::string &path, compression_flags compressionFlags)
Deprecated interface for opening a named stream.
Definition: stream.h:236
void open(temp_file &file, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a temporary stream.
Definition: stream.h:225
Random access is intended.
Definition: stream.h:54
CRTP base of file_base.
bool can_read()
Check if the next call to read() will succeed or not.
Definition: stream.h:1010
No written blocks should be compressed.
Definition: scheme.h:37
void truncate(stream_size_type offset)
Truncate to given size.
Definition: stream.h:698
std::string describe()
For debugging: Describe the internal stream state in a string.
Definition: stream.h:613
Central file abstraction.
Definition: file.h:44
const T & read()
Reads next item from stream if can_read() == true.
Definition: stream.h:947
Base class containing the implementation details that are independent of the item type...
Definition: stream.h:125
offset_type
Type describing how we should interpret the offset supplied to seek.
Definition: stream_crtp.h:38
buffer_t m_buffer
Buffer holding the items of the block currently being read/written.
Definition: stream.h:407
Open a file for reading.
Definition: access_type.h:31
void open(const std::string &path, access_type accessType, memory_size_type userDataSize=0, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening a named stream.
Definition: stream.h:204
void uncache_read_writes()
Reset cheap read/write counts to zero so that the next read/write operation will check stream state p...
Definition: stream.h:345
Buffer manager for a single stream.
Definition: buffer.h:244
Compress some blocks according to available resources (time, memory).
Definition: stream.h:57
void seek(stream_offset_type offset, offset_type whence=beginning)
Precondition: is_open() Precondition: offset == 0.
Definition: stream.h:627
file_accessor::byte_stream_accessor< default_raw_file_accessor > m_byteStreamAccessor
File accessor.
Definition: stream.h:401
temp_file * m_tempFile
The temporary file we have opened (when appropriate).
Definition: stream.h:399
Generic internal array with known memory requirements.
POD object indicating the position of an item in a stream.
Interface to the compressor thread.
void read(IT const a, IT const b)
Precondition: is_open().
Definition: stream.h:1003
memory_size_type m_itemSize
Size of a single item.
Definition: stream.h:390
bool can_read_back()
Check if the next call to read_back() will succeed or not.
Definition: stream.h:1023
void open(memory_size_type userDataSize, cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_none)
Deprecated interface for opening an unnamed temporary stream.
Definition: stream.h:216
stream_size_type offset() const
The stream offset of the item pointed to.
compressor_response m_response
Response from compressor thread; protected by compressor thread mutex.
Definition: stream.h:420
stream_size_type m_lastBlockReadOffset
When use_compression() is true: Read offset of the last block in the stream.
Definition: stream.h:416
void open(temp_file &file, compression_flags compressionFlags)
Deprecated interface for opening a temporary stream.
Definition: stream.h:252
The buffer will soon change to reflect the contents on the disk.
Definition: buffer.h:75
void open(compression_flags compressionFlags)
Deprecated interface for opening an unnamed temporary stream.
Definition: stream.h:244
stream_size_type m_offset
Offset of next item to read/write, relative to beginning of stream.
Definition: stream.h:453
void truncate(const stream_position &pos)
Truncate to given stream position.
Definition: stream.h:716
Compress all blocks according to the preferred compression scheme which can be set using tpie::the_co...
Definition: stream.h:61
Stream position indicator.
memory_size_type m_blockItems
Number of items in a logical block.
Definition: stream.h:380
Class representing a reference to a temporary file.
Definition: tempname.h:202
stream_buffers m_buffers
Buffer manager for this entire stream.
Definition: stream.h:405
Implementation helper that closes the stream if a method exits by throwing an exception.
Definition: stream.h:470
Buffers for compressed streams.
void describe(std::ostream &out)
For debugging: Describe the internal stream state in a string.
Definition: stream.h:556
Random access is intended.
Definition: cache_hint.h:40
Open a file for writing only, content is truncated.
Definition: access_type.h:33
logstream & log_debug()
Return logstream for writing debug log messages.
Definition: tpie_log.h:167
The buffer is equal to the contents on the disk.
Definition: buffer.h:78
memory_size_type m_blockSize
Size (in bytes) of a logical (uncompressed) block.
Definition: stream.h:382
static stream_position end()
Special-value constructor returning a pointer to the end.
Read/write direction enumeration.
bool m_canRead
Whether we are open for reading.
Definition: stream.h:384
std::unique_ptr< T, tpie_deleter > unique_ptr
like std::unique_ptr, but delete the object with tpie_delete.
Definition: memory.h:338
Compressed stream.
Definition: predeclare.h:46
compression_flags
Possible values for the compressionFlags parameter to stream::open.
Definition: scheme.h:33
const T & peek()
Peeks next item from stream if can_read() == true.
Definition: stream.h:970
Compress some blocks according to available resources (time, memory).
Definition: scheme.h:40
Item type-agnostic file_stream operations.
Neither sequential access nor random access is intended.
Definition: stream.h:51
tpie::unique_ptr< temp_file > m_ownedTempFile
The anonymous temporary file we have opened (when appropriate).
Definition: stream.h:396
bool m_canWrite
Whether we are open for writing.
Definition: stream.h:386
stream_size_type current_file_size(compressor_thread_lock &l)
Blocks to take the compressor lock.
memory_size_type m_cachedReads
Number of cheap, unchecked reads we can do next.
Definition: stream.h:392
Compressor thread requests and responses.
Temporary file names.
access_type
Type describing how we wish to access a file.
Definition: access_type.h:29
bool m_open
Whether we are open.
Definition: stream.h:388
#define tp_assert(condition, message)
Definition: tpie_assert.h:48
Neither sequential access nor random access is intended.
Definition: cache_hint.h:31
Open a file for writing only.
Definition: stream.h:48
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:147
stream_size_type last_block_read_offset(compressor_thread_lock &l)
Blocks to take the compressor lock.
bool m_updateReadOffsetFromWrite
When use_compression() is true: Indicates whether m_response is the response to a write request...
Definition: stream.h:426
Response to an I/O request.
Definition: request.h:51
The buffer will soon be written to disk.
Definition: buffer.h:72
stream_size_type m_streamBlocks
The number of blocks written to the file.
Definition: stream.h:411
Compress all blocks according to the preferred compression scheme which can be set using tpie::the_co...
Definition: scheme.h:44
Open a file for reading or writing.
Definition: access_type.h:35