activemq-cpp-3.8.2
TransferQueue.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_
19 #define _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_
20 
22 
27 
28 #include <decaf/lang/Thread.h>
29 
30 namespace decaf {
31 namespace internal {
32 namespace util {
33 namespace concurrent {
34 
36 
45  template< typename E >
46  class TransferQueue : public Transferer<E> {
47  private:
48 
49 // /** Node class for TransferQueue. */
50 // class QNode {
51 // private:
52 //
53 // AtomicReference<QNode> next; // next node in queue
54 // volatile decaf::lang::Thread* waiter; // to control park/unpark
55 // AtomicReference<E*> item; // CAS'ed to or from null
56 //
57 // bool data;
58 // bool cancelled;
59 //
60 // public:
61 //
62 // QNode() : cancelled( false ), data( false ) {
63 // }
64 //
65 // QNode( E* item ) : cancelled( false ), data( true ) {
66 // this->item.set( item );
67 // }
68 //
69 // bool casNext( QNode* cmp, QNode* val ) {
70 // return ( this->next == cmp && this->next.compareAndSet( cmp, val ) );
71 // }
72 //
73 // bool casItem( E* cmp, E* val ) {
74 // return ( this->item == cmp && this->item.compareAndSet( cmp, val ) );
75 // }
76 //
77 // /**
78 // * Tries to cancel by CAS'ing ref to NULL if that succeeds then we mark as cancelled.
79 // */
80 // void tryCancel( E* cmp ) {
81 // if( item.compareAndSet( cmp, NULL ) ) {
82 // this->cancelled = true;
83 // }
84 // }
85 //
86 // bool isCancelled() {
87 // return this->item == this;
88 // }
89 //
90 // /**
91 // * Returns true if this node is known to be off the queue
92 // * because its next pointer has been forgotten due to
93 // * an advanceHead operation.
94 // */
95 // bool isOffList() {
96 // return this->next == NULL;
97 // }
98 // };
99 
100  private:
101 
102 // decaf::util::concurrent::atomic::AtomicReference<QNode> head;
103 // decaf::util::concurrent::atomic::AtomicReference<QNode> tail;
104 //
105 // decaf::util::concurrent::atomic::AtomicReference<QNode> cleanMe;
106 
107  public:
108 
110 // // Initialize with a dummy Node.
111 // this->head.set( new QNode() );
112 // this->tail.set( this->head.get() );
113  }
114 
115  virtual ~TransferQueue() {}
116 
117  virtual void transfer( E* e, bool timed, long long nanos ) {
118 
119  }
120 
121  virtual E* transfer( bool timed, long long nanos ) {
122  return NULL;
123  }
124 
128 // E* transfer( E* e, bool timed, long long nanos ) {
129 
130  /* Basic algorithm is to loop trying to take either of
131  * two actions:
132  *
133  * 1. If queue apparently empty or holding same-mode nodes,
134  * try to add node to queue of waiters, wait to be
135  * fulfilled (or cancelled) and return matching item.
136  *
137  * 2. If queue apparently contains waiting items, and this
138  * call is of complementary mode, try to fulfill by CAS'ing
139  * item field of waiting node and dequeuing it, and then
140  * returning matching item.
141  *
142  * In each case, along the way, check for and try to help
143  * advance head and tail on behalf of other stalled/slow
144  * threads.
145  *
146  * The loop starts off with a null check guarding against
147  * seeing uninitialized head or tail values. This never
148  * happens in current SynchronousQueue, but could if
149  * callers held non-volatile/final ref to the
150  * transferer. The check is here anyway because it places
151  * null checks at top of loop, which is usually faster
152  * than having them implicitly interspersed.
153  */
154 
155 // QNode s = null; // constructed/reused as needed
156 // boolean isData = (e != null);
157 //
158 // for (;;) {
159 // QNode t = tail;
160 // QNode h = head;
161 // if (t == null || h == null) // saw uninitialized value
162 // continue; // spin
163 //
164 // if (h == t || t.isData == isData) { // empty or same-mode
165 // QNode tn = t.next;
166 // if (t != tail) // inconsistent read
167 // continue;
168 // if (tn != null) { // lagging tail
169 // advanceTail(t, tn);
170 // continue;
171 // }
172 // if (timed && nanos <= 0) // can't wait
173 // return null;
174 // if (s == null)
175 // s = new QNode(e, isData);
176 // if (!t.casNext(null, s)) // failed to link in
177 // continue;
178 //
179 // advanceTail(t, s); // swing tail and wait
180 // Object x = awaitFulfill(s, e, timed, nanos);
181 // if (x == s) { // wait was cancelled
182 // clean(t, s);
183 // return null;
184 // }
185 //
186 // if (!s.isOffList()) { // not already unlinked
187 // advanceHead(t, s); // unlink if head
188 // if (x != null) // and forget fields
189 // s.item = s;
190 // s.waiter = null;
191 // }
192 // return (x != null)? x : e;
193 //
194 // } else { // complementary-mode
195 // QNode m = h.next; // node to fulfill
196 // if (t != tail || m == null || h != head)
197 // continue; // inconsistent read
198 //
199 // Object x = m.item;
200 // if (isData == (x != null) || // m already fulfilled
201 // x == m || // m cancelled
202 // !m.casItem(x, e)) { // lost CAS
203 // advanceHead(h, m); // dequeue and retry
204 // continue;
205 // }
206 //
207 // advanceHead(h, m); // successfully fulfilled
208 // LockSupport.unpark(m.waiter);
209 // return (x != null)? x : e;
210 // }
211 // }
212 // }
213 
214  private:
215 
225 // Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
226 // /* Same idea as TransferStack.awaitFulfill */
227 // long lastTime = (timed)? System.nanoTime() : 0;
228 // Thread w = Thread.currentThread();
229 // int spins = ((head.next == s) ?
230 // (timed? maxTimedSpins : maxUntimedSpins) : 0);
231 // for (;;) {
232 // if (w.isInterrupted())
233 // s.tryCancel(e);
234 // Object x = s.item;
235 // if (x != e)
236 // return x;
237 // if (timed) {
238 // long now = System.nanoTime();
239 // nanos -= now - lastTime;
240 // lastTime = now;
241 // if (nanos <= 0) {
242 // s.tryCancel(e);
243 // continue;
244 // }
245 // }
246 // if (spins > 0)
247 // --spins;
248 // else if (s.waiter == null)
249 // s.waiter = w;
250 // else if (!timed)
251 // LockSupport.park();
252 // else if (nanos > spinForTimeoutThreshold)
253 // LockSupport.parkNanos(nanos);
254 // }
255 // }
256 
260 // void clean(QNode pred, QNode s) {
261 // s.waiter = null; // forget thread
262 // /*
263 // * At any given time, exactly one node on list cannot be
264 // * deleted -- the last inserted node. To accommodate this,
265 // * if we cannot delete s, we save its predecessor as
266 // * "cleanMe", deleting the previously saved version
267 // * first. At least one of node s or the node previously
268 // * saved can always be deleted, so this always terminates.
269 // */
270 // while (pred.next == s) { // Return early if already unlinked
271 // QNode h = head;
272 // QNode hn = h.next; // Absorb cancelled first node as head
273 // if (hn != null && hn.isCancelled()) {
274 // advanceHead(h, hn);
275 // continue;
276 // }
277 // QNode t = tail; // Ensure consistent read for tail
278 // if (t == h)
279 // return;
280 // QNode tn = t.next;
281 // if (t != tail)
282 // continue;
283 // if (tn != null) {
284 // advanceTail(t, tn);
285 // continue;
286 // }
287 // if (s != t) { // If not tail, try to unsplice
288 // QNode sn = s.next;
289 // if (sn == s || pred.casNext(s, sn))
290 // return;
291 // }
292 // QNode dp = cleanMe;
293 // if (dp != null) { // Try unlinking previous cancelled node
294 // QNode d = dp.next;
295 // QNode dn;
296 // if (d == null || // d is gone or
297 // d == dp || // d is off list or
298 // !d.isCancelled() || // d not cancelled or
299 // (d != t && // d not tail and
300 // (dn = d.next) != null && // has successor
301 // dn != d && // that is on list
302 // dp.casNext(d, dn))) // d unspliced
303 // casCleanMe(dp, null);
304 // if (dp == pred)
305 // return; // s is already saved node
306 // } else if (casCleanMe(null, pred))
307 // return; // Postpone cleaning s
308 // }
309 // }
310 
311 // /**
312 // * Tries to cas nh as new head; if successful, unlink
313 // * old head's next node to avoid garbage retention.
314 // */
315 // void advanceHead( QNode* h, QNode* nh ) {
316 // if( h == head.get() && this->head.compareAndSet( h, nh ) ) {
317 // h->next = h; // forget old next
318 // }
319 // }
320 //
321 // /**
322 // * Tries to cas nt as new tail.
323 // */
324 // void advanceTail( QNode* t, QNode* nt ) {
325 // if( this->tail.get() == t ) {
326 // this->tail.compareAndSet( t, nt );
327 // }
328 // }
329 //
330 // /**
331 // * Tries to CAS cleanMe slot.
332 // */
333 // bool casCleanMe( QNode* cmp, QNode* val ) {
334 // return ( this->cleanMe.get() == cmp &&
335 // this->cleanMe.compareAndSet( cmp, val ) );
336 // }
337 
338  };
339 
340 }}}}
341 
342 #endif /* _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_ */
virtual ~TransferQueue()
Definition: TransferQueue.h:115
This extends Scherer-Scott dual queue algorithm, differing, among other ways, by using modes within n...
Definition: TransferQueue.h:46
#define NULL
Definition: Config.h:33
TransferQueue()
Node class for TransferQueue.
Definition: TransferQueue.h:109
Shared internal API for dual stacks and queues.
Definition: Transferer.h:33
An Pointer reference that may be updated atomically.
Definition: AtomicReference.h:34
virtual void transfer(E *e, bool timed, long long nanos)
Performs a put.
Definition: TransferQueue.h:117
virtual E * transfer(bool timed, long long nanos)
Performs a take.
Definition: TransferQueue.h:121
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: AprPool.h:25