TPIE

2362a60
queue.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2008, 2010, 2013, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_QUEUE_H__
21 #define __TPIE_QUEUE_H__
22 
27 #include <tpie/portability.h>
28 #include <tpie/deprecated.h>
29 #include <tpie/err.h>
30 #include <tpie/tempname.h>
31 #include <tpie/file_stream.h>
32 #include <tpie/internal_queue.h>
33 #include <limits>
34 #include <tpie/persist.h>
35 namespace tpie {
36 
52 template<class T>
53 class queue {
54 public:
59  compression_flags compressionFlags=compression_normal)
60  : m_size(0)
61  , m_queueA(1.0)
62  , m_queueB(1.0)
63  , m_centerQueue(get_block_size()/sizeof(T))
64  , m_currentQueue(true)
65  {
66  m_queueA.open(0, cacheHint, compressionFlags);
67  m_queueB.open(0, cacheHint, compressionFlags);
68  }
69 
74  inline bool empty() {return m_size == 0;}
75 
80  inline stream_size_type size() {return m_size;}
81 
86  inline void push(const T & t) {
87  if (push_queue().size() == 0 && !m_centerQueue.full()) {
88  // The back of the queue is the center queue
89  m_centerQueue.push(t);
90  } else {
91  // The back of the queue is the push stream
92  push_queue().write(t);
93  }
94  ++m_size;
95  }
96 
101  const T & pop() {
102  --m_size;
103  if(pop_queue().can_read()) {
104  // The front of the queue is the pop stream
105  return pop_queue().read();
106  } else if (!m_centerQueue.empty()) {
107  // The front of the queue is the center queue
108  const T & i = m_centerQueue.front();
109  m_centerQueue.pop();
110  return i;
111  } else {
112  // The front of the queue is the push stream,
113  // so we swap the push stream and the pop stream.
114  swap_file_streams();
115  return pop_queue().read();
116  }
117  }
118 
123  const T & front() {
124  if(pop_queue().can_read())
125  return pop_queue().peek();
126  else if(!m_centerQueue.empty())
127  return m_centerQueue.front();
128  else {
129  swap_file_streams();
130  return pop_queue().peek();
131  }
132  }
133 
137  static memory_size_type memory_usage(double blockFactor=1.0) {
138  return sizeof(queue<T>)
139  + 3*file_stream<T>::memory_usage(blockFactor) - 2*sizeof(file_stream<T>);
140  }
141 
142 private:
143  file_stream<T> & push_queue() {
144  if(m_currentQueue)
145  return m_queueA;
146  return m_queueB;
147  }
148 
149  file_stream<T> & pop_queue() {
150  if(m_currentQueue)
151  return m_queueB;
152  return m_queueA;
153  }
154 
155  void swap_file_streams() {
156  m_currentQueue = !m_currentQueue;
157  pop_queue().seek(0);
158  push_queue().truncate(0);
159  }
160 
161  stream_size_type m_size;
162  file_stream<T> m_queueA;
163  file_stream<T> m_queueB;
164  internal_queue<T> m_centerQueue;
165  bool m_currentQueue;
166 };
167 
168 namespace ami {
169  TPIE_DEPRECATED_CLASS_A(
170  template <typename T>
171  class TPIE_DEPRECATED_CLASS_B queue: public tpie::queue<T> {
172  public:
173  queue() {}
174  queue(const std::string& basename): tpie::queue<T>(basename) {}
175 
180  bool is_empty() {
181  return this->empty();
182  }
183 
188  err enqueue(const T &t) {
189  this->push(t);
190  return NO_ERROR;
191  }
192 
197  err dequeue(const T **t) {
198  try {
199  *t = &this->pop();
200  } catch (end_of_stream_exception e) {
201  return ami::END_OF_STREAM;
202  }
203  return NO_ERROR;
204  }
205 
210  err peek(const T **t) {
211  *t = &this->front();
212  return NO_ERROR;
213  }
214 
218  err trim() {
219  return NO_ERROR;
220  }
221 
226  void persist(persistence) {
227 
228  }
229  };
230  );
231 }
232 } // tpie namespace
233 #endif // __TPIE_QUEUE_H__
Sequential access is intended.
Definition: cache_hint.h:36
memory_size_type get_block_size()
Get the TPIE block size.
cache_hint
Definition: cache_hint.h:28
No error occurred.
Definition: err.h:47
Macros for deprecating classes, methods and typedefs.
Generic internal queue with known memory requirements.
This file contains a few deprecated definitions for legacy code.
stream_size_type size()
Returns the number of items currently on the queue.
Definition: queue.h:80
const T & pop()
Dequeues an item.
Definition: queue.h:101
bool empty()
Check if the queue is empty.
Definition: queue.h:74
static memory_size_type memory_usage(double blockFactor=1.0)
Compute the memory used by the queue.
Definition: queue.h:137
Legacy AMI error types.
Persistence tags for deprecated TPIE AMI streams.
void push(const T &t)
Enqueue an item.
Definition: queue.h:86
Basic Implementation of I/O Efficient FIFO queue.
Definition: queue.h:53
Compressed stream.
Definition: predeclare.h:46
compression_flags
Possible values for the compressionFlags parameter to stream::open.
Definition: scheme.h:33
err
Legacy TPIE error codes.
Definition: err.h:45
Compress some blocks according to available resources (time, memory).
Definition: scheme.h:40
Temporary file names.
queue(cache_hint cacheHint=access_sequential, compression_flags compressionFlags=compression_normal)
Constructor for Temporary Queue.
Definition: queue.h:58
const T & front()
Returns at the frontmost item in the queue.
Definition: queue.h:123
An attempt was made to read past the end of a stream or write past the end of a substream.
Definition: err.h:52