activemq-cpp-3.8.2
LinkedBlockingQueue.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
19 #define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
20 
21 #include <decaf/util/Config.h>
22 
27 #include <decaf/util/Iterator.h>
28 #include <decaf/lang/Integer.h>
29 #include <decaf/lang/Math.h>
30 #include <decaf/lang/Pointer.h>
34 
35 namespace decaf {
36 namespace util {
37 namespace concurrent {
38 
40 
51  template<typename E>
52  class LinkedBlockingQueue : public BlockingQueue<E> {
53  private:
54 
55  template< typename U >
56  class QueueNode {
57  private:
58 
59  U value;
60  bool unlinked;
61  bool dequeued;
62 
63  public:
64 
65  Pointer< QueueNode<E> > next;
66 
67  private:
68 
69  QueueNode(const QueueNode&);
70  QueueNode& operator=(const QueueNode&);
71 
72  public:
73 
74  QueueNode() : value(), unlinked(false), dequeued(false), next() {}
75  QueueNode(const U& value) : value(value), unlinked(false), dequeued(false), next() {}
76 
77  void set(Pointer< QueueNode<U> > next, const U& value) {
78  this->next = next;
79  this->value = value;
80  this->unlinked = false;
81  this->dequeued = false;
82  }
83 
84  E get() const {
85  return this->value;
86  }
87 
88  E getAndDequeue() {
89  E result = this->value;
90  this->value = E();
91  this->dequeued = true;
92 
93  return result;
94  }
95 
96  void unlink() {
97  this->value = E();
98  this->unlinked = true;
99  }
100 
101  bool isUnlinked() const {
102  return this->unlinked;
103  }
104 
105  bool isDequeued() const {
106  return this->dequeued;
107  }
108  };
109 
110  class TotalLock {
111  private:
112 
113  TotalLock(const TotalLock& src);
114  TotalLock& operator=(const TotalLock& src);
115 
116  private:
117 
118  const LinkedBlockingQueue<E>* parent;
119 
120  public:
121 
122  TotalLock(const LinkedBlockingQueue<E>* parent) : parent(parent) {
123  parent->putLock.lock();
124  parent->takeLock.lock();
125  }
126 
127  ~TotalLock() {
128  parent->putLock.unlock();
129  parent->takeLock.unlock();
130  }
131 
132  };
133 
134  private:
135 
136  int capacity;
138 
140  mutable locks::ReentrantLock takeLock;
141 
143  Pointer<locks::Condition> notEmpty; // takeLock.newCondition();
144 
146  mutable locks::ReentrantLock putLock;
147 
149  Pointer<locks::Condition> notFull; // putLock.newCondition();
150 
151  Pointer< QueueNode<E> > head;
152  Pointer< QueueNode<E> > tail;
153 
154  public:
155 
159  LinkedBlockingQueue() : BlockingQueue<E>(), capacity(lang::Integer::MAX_VALUE), count(),
160  takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
161 
162  this->tail = this->head;
163  this->notEmpty.reset(this->takeLock.newCondition());
164  this->notFull.reset(this->putLock.newCondition());
165  }
166 
175  LinkedBlockingQueue(int capacity) : BlockingQueue<E>(), capacity(capacity), count(),
176  takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
177  if(capacity <= 0) {
179  __FILE__, __LINE__, "Capacity value must be greater than zero.");
180  }
181 
182  this->tail = this->head;
183  this->notEmpty.reset(this->takeLock.newCondition());
184  this->notFull.reset(this->putLock.newCondition());
185  }
186 
198  capacity(lang::Integer::MAX_VALUE), count(),
199  takeLock(), notEmpty(), putLock(), notFull(),
200  head(new QueueNode<E>()), tail() {
201 
202  this->tail = this->head;
203  this->notEmpty.reset(this->takeLock.newCondition());
204  this->notFull.reset(this->putLock.newCondition());
205 
206  Pointer< Iterator<E> > iter(collection.iterator());
207 
208  try {
209 
210  int count = 0;
211 
212  while(iter->hasNext()) {
213  if(count == this->capacity) {
214  throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
215  "Number of elements in the Collection exceeds this Queue's Capacity.");
216  }
217 
218  this->enqueue(iter->next());
219  ++count;
220  }
221 
222  this->count.set(count);
223  }
227  }
228 
240  capacity(lang::Integer::MAX_VALUE), count(),
241  takeLock(), notEmpty(), putLock(), notFull(),
242  head(new QueueNode<E>()), tail() {
243 
244  this->tail = this->head;
245  this->notEmpty.reset(this->takeLock.newCondition());
246  this->notFull.reset(this->putLock.newCondition());
247 
248  Pointer< Iterator<E> > iter(queue.iterator());
249 
250  try {
251 
252  int count = 0;
253 
254  while(iter->hasNext()) {
255  if(count == this->capacity) {
256  throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
257  "Number of elements in the Collection exceeds this Queue's Capacity.");
258  }
259 
260  this->enqueue(iter->next());
261  ++count;
262  }
263 
264  this->count.set(count);
265  }
269  }
270 
272  try{
273  this->purgeList();
274  } catch(...) {}
275  }
276 
277  public:
278 
280  this->clear();
281  this->addAll(queue);
282  return *this;
283  }
284 
286  this->clear();
287  this->addAll(collection);
288  return *this;
289  }
290 
291  public:
292 
293  virtual int size() const {
294  return this->count.get();
295  }
296 
297  virtual void clear() {
298 
299  TotalLock lock(this);
300 
301  this->purgeList();
302  this->tail = this->head;
303  this->count.set(0);
304 
305  if(this->count.getAndSet(0) == this->capacity) {
306  this->notFull->signal();
307  }
308  }
309 
310  virtual int remainingCapacity() const {
311  return this->capacity - this->count.get();
312  }
313 
314  virtual void put( const E& value ) {
315 
316  int c = -1;
317 
318  this->putLock.lockInterruptibly();
319  try {
320 
321  // Note that count is used in wait guard even though it is not
322  // protected by lock. This works because count can only decrease at
323  // this point (all other puts are shut out by lock), and we (or some
324  // other waiting put) are signaled if it ever changes from capacity.
325  // Similarly for all other uses of count in other wait guards.
326  while (this->count.get() == this->capacity) {
327  this->notFull->await();
328  }
329 
330  // This method now owns the putLock so we know we have at least
331  // enough capacity for one put, if we enqueue an item and there's
332  // still more room we should signal a waiting put to ensure that
333  // threads don't wait forever.
334  enqueue(value);
335  c = this->count.getAndIncrement();
336 
337  if(c + 1 < this->capacity) {
338  this->notFull->signal();
339  }
340  } catch(decaf::lang::Exception& ex) {
341  this->putLock.unlock();
342  throw;
343  }
344 
345  this->putLock.unlock();
346 
347  // When c is zero it means we at least incremented once so there was
348  // something in the Queue, another take could have already happened but
349  // we don't know so wake up a waiting taker.
350  if (c == 0) {
351  this->signalNotEmpty();
352  }
353  }
354 
355  virtual bool offer( const E& value, long long timeout, const TimeUnit& unit ) {
356 
357  int c = -1;
358  long long nanos = unit.toNanos(timeout);
359 
360  this->putLock.lockInterruptibly();
361  try {
362 
363  while(this->count.get() == this->capacity) {
364  if (nanos <= 0) {
365  return false;
366  }
367 
368  nanos = this->notFull->awaitNanos(nanos);
369  }
370 
371  enqueue(value);
372  c = this->count.getAndIncrement();
373 
374  if(c + 1 < this->capacity) {
375  this->notFull->signal();
376  }
377 
378  } catch(decaf::lang::Exception& ex) {
379  this->putLock.unlock();
380  throw;
381  }
382 
383  this->putLock.unlock();
384 
385  if(c == 0) {
386  this->signalNotEmpty();
387  }
388 
389  return true;
390  }
391 
392  virtual bool offer(const E& value) {
393 
394  if (this->count.get() == this->capacity) {
395  return false;
396  }
397 
398  int c = -1;
399  this->putLock.lockInterruptibly();
400  try {
401 
402  if (this->count.get() < this->capacity) {
403 
404  enqueue(value);
405  c = this->count.getAndIncrement();
406 
407  if (c + 1 < this->capacity) {
408  this->notFull->signal();
409  }
410  }
411 
412  } catch (decaf::lang::Exception& ex) {
413  this->putLock.unlock();
414  throw;
415  }
416 
417  this->putLock.unlock();
418 
419  if (c == 0) {
420  this->signalNotEmpty();
421  }
422 
423  return c >= 0;
424  }
425 
426  virtual E take() {
427 
428  E value = E();
429  int c = -1;
430 
431  this->takeLock.lockInterruptibly();
432  try {
433 
434  while (this->count.get() == 0) {
435  this->notEmpty->await();
436  }
437 
438  // Since this methods owns the takeLock and count != 0 we know that
439  // its safe to take one element. if c is greater than one then there
440  // is at least one more so we try to wake up another taker if any.
441  value = dequeue();
442  c = this->count.getAndDecrement();
443 
444  if (c > 1) {
445  this->notEmpty->signal();
446  }
447 
448  } catch (decaf::lang::Exception& ex) {
449  this->takeLock.unlock();
450  throw;
451  }
452 
453  this->takeLock.unlock();
454 
455  // When c equals capacity we have removed at least one element
456  // from the Queue so we wake a blocked put operation if there is
457  // one to prevent a deadlock.
458  if (c == this->capacity) {
459  this->signalNotFull();
460  }
461 
462  return value;
463  }
464 
465  virtual bool poll(E& result, long long timeout, const TimeUnit& unit) {
466  int c = -1;
467  long long nanos = unit.toNanos(timeout);
468 
469  this->takeLock.lockInterruptibly();
470  try {
471 
472  while (this->count.get() == 0) {
473  if (nanos <= 0) {
474  return false;
475  }
476 
477  nanos = this->notEmpty->awaitNanos(nanos);
478  }
479 
480  result = dequeue();
481  c = this->count.getAndDecrement();
482 
483  if (c > 1) {
484  this->notEmpty->signal();
485  }
486 
487  } catch (decaf::lang::Exception& ex) {
488  this->takeLock.unlock();
489  throw;
490  }
491 
492  this->takeLock.unlock();
493 
494  if(c == this->capacity) {
495  this->signalNotFull();
496  }
497 
498  return true;
499  }
500 
501  virtual bool poll(E& result) {
502 
503  if (this->count.get() == 0) {
504  return false;
505  }
506 
507  int c = -1;
508  this->takeLock.lock();
509  try {
510 
511  if (this->count.get() > 0) {
512  result = dequeue();
513  c = this->count.getAndDecrement();
514 
515  if (c > 1) {
516  this->notEmpty->signal();
517  }
518  }
519 
520  } catch (decaf::lang::Exception& ex) {
521  this->takeLock.unlock();
522  throw;
523  }
524 
525  this->takeLock.unlock();
526 
527  if (c == this->capacity) {
528  this->signalNotFull();
529  }
530 
531  return true;
532  }
533 
534  virtual bool peek(E& result) const {
535 
536  if(this->count.get() == 0) {
537  return false;
538  }
539 
540  this->takeLock.lock();
541  try {
542  Pointer< QueueNode<E> > front = this->head->next;
543  if(front == NULL) {
544  return false;
545  } else {
546  result = front->get();
547  }
548  } catch (decaf::lang::Exception& ex) {
549  this->takeLock.unlock();
550  throw;
551  }
552 
553  this->takeLock.unlock();
554 
555  return true;
556  }
557 
559 
560  virtual bool remove(const E& value) {
561 
562  TotalLock lock(this);
563 
564  for(Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p != NULL;
565  predicessor = p, p = p->next) {
566 
567  if(value == p->get()) {
568  unlink(p, predicessor);
569  return true;
570  }
571  }
572 
573  return false;
574  }
575 
576  virtual std::vector<E> toArray() const {
577 
578  TotalLock lock(this);
579 
580  int size = this->count.get();
581  std::vector<E> array;
582  array.reserve(size);
583 
584  for(Pointer< QueueNode<E> > p = this->head->next; p != NULL; p = p->next) {
585  array.push_back(p->get());
586  }
587 
588  return array;
589  }
590 
591  virtual std::string toString() const {
592  return std::string("LinkedBlockingQueue [ current size = ") +
593  decaf::lang::Integer::toString(this->count.get()) + "]";
594  }
595 
596  virtual int drainTo( Collection<E>& c ) {
597  return this->drainTo(c, decaf::lang::Integer::MAX_VALUE);
598  }
599 
600  virtual int drainTo( Collection<E>& sink, int maxElements ) {
601 
602  if(&sink == this) {
603  throw decaf::lang::exceptions::IllegalArgumentException(__FILE__, __LINE__,
604  "Cannot drain this Collection to itself.");
605  }
606 
607  bool signalNotFull = false;
608  bool shouldThrow = false;
609  decaf::lang::Exception delayed;
610  int result = 0;
611 
612  this->takeLock.lock();
613  try {
614 
615  // We get the count of Nodes that exist now, any puts that are done
616  // after this are not drained and since we hold the lock nothing can
617  // get taken so state should remain consistent.
618  result = decaf::lang::Math::min(maxElements, this->count.get());
619  Pointer< QueueNode<E> > node = this->head;
620  int i = 0;
621  try {
622 
623  while(i < result) {
624  Pointer< QueueNode<E> > p = node->next;
625  sink.add( p->getAndDequeue() );
626  node = p;
627  ++i;
628  }
629 
630  } catch(decaf::lang::Exception& e) {
631  delayed = e;
632  shouldThrow = true;
633  }
634 
635  if (i > 0) {
636  this->head = node;
637  signalNotFull = (this->count.getAndAdd(-i) == this->capacity);
638  }
639 
640  } catch(decaf::lang::Exception& ex) {
641  this->takeLock.unlock();
642  throw;
643  }
644 
645  this->takeLock.unlock();
646 
647  if (signalNotFull) {
648  this->signalNotFull();
649  }
650 
651  if (shouldThrow) {
652  throw delayed;
653  }
654 
655  return result;
656  }
657 
658  private:
659 
660  class LinkedIterator : public Iterator<E> {
661  private:
662 
663  Pointer< QueueNode<E> > current;
664  Pointer< QueueNode<E> > last;
665  E currentElement;
666  LinkedBlockingQueue<E>* parent;
667 
668  private:
669 
670  LinkedIterator(const LinkedIterator&);
671  LinkedIterator& operator= (const LinkedIterator&);
672 
673  public:
674 
675  LinkedIterator(LinkedBlockingQueue<E>* parent) : current(), last(),
676  currentElement(), parent(parent) {
677  TotalLock lock(parent);
678 
679  this->current = parent->head->next;
680  if(this->current != NULL) {
681  this->currentElement = current->get();
682  }
683  }
684 
685  virtual bool hasNext() const {
686  return this->current != NULL;
687  }
688 
689  virtual E next() {
690 
691  TotalLock lock(this->parent);
692 
693  if(this->current == NULL) {
694  throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
695  "Iterator next called with no matching next element.");
696  }
697 
698  E result = this->currentElement;
699  this->last = this->current;
700  this->current = this->nextNode(this->current);
701  this->currentElement = (this->current == NULL) ? E() : this->current->get();
702 
703  return result;
704  }
705 
706  virtual void remove() {
707 
708  if(this->last == NULL) {
709  throw decaf::lang::exceptions::IllegalStateException(__FILE__, __LINE__,
710  "Iterator remove called without having called next().");
711  }
712 
713  TotalLock lock(this->parent);
714 
715  Pointer< QueueNode<E> > node;
716  node.swap(this->last);
717 
718  for(Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p != NULL;
719  trail = p, p = p->next) {
720 
721  if(p == node) {
722  this->parent->unlink(p, trail);
723  break;
724  }
725  }
726  }
727 
728  private:
729 
730  Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
731 
732  // Handle the case of a dequeued Node, the new head of Queue
733  // will be parent->head->next() even if the Queue is empty.
734  if(p->isDequeued()) {
735  return this->parent->head->next;
736  }
737 
738  Pointer< QueueNode<E> > s = p->next;
739 
740  // Handle Nodes that have been removed from the interior of the
741  // Queue, these are tagged but still retain their next() value
742  // in order to account for multiple removes. If all nodes were
743  // removed from the last call then eventually we reach next() == NULL
744  // which is the old tail.
745  while(s != NULL && s->isUnlinked()) {
746  s = s->next;
747  }
748 
749  return s;
750  }
751 
752  };
753 
754  class ConstLinkedIterator : public Iterator<E> {
755  private:
756 
757  Pointer< QueueNode<E> > current;
758  Pointer< QueueNode<E> > last;
759  E currentElement;
760  const LinkedBlockingQueue<E>* parent;
761 
762  private:
763 
764  ConstLinkedIterator(const ConstLinkedIterator&);
765  ConstLinkedIterator& operator= (const ConstLinkedIterator&);
766 
767  public:
768 
769  ConstLinkedIterator(const LinkedBlockingQueue<E>* parent) : current(), last(),
770  currentElement(),
771  parent(parent) {
772  TotalLock lock(parent);
773 
774  this->current = parent->head->next;
775  if(this->current != NULL) {
776  this->currentElement = current->get();
777  }
778  }
779 
780  virtual bool hasNext() const {
781  return this->current != NULL;
782  }
783 
784  virtual E next() {
785 
786  TotalLock lock(this->parent);
787 
788  if(this->current == NULL) {
789  throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
790  "Iterator next called with no matching next element.");
791  }
792 
793  E result = this->currentElement;
794  this->last = this->current;
795  this->current = this->nextNode(this->current);
796  this->currentElement = (this->current == NULL) ? E() : this->current->get();
797 
798  return result;
799  }
800 
801  virtual void remove() {
803  __FILE__, __LINE__, "Cannot write to a const ListIterator." );
804  }
805 
806  private:
807 
808  Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
809 
810  // Handle the case of a dequeued Node, the new head of Queue
811  // will be parent->head->next() even if the Queue is empty.
812  if(p->isDequeued()) {
813  return this->parent->head->next;
814  }
815 
816  Pointer< QueueNode<E> > s = p->next;
817 
818  // Handle Nodes that have been removed from the interior of the
819  // Queue, these are tagged but still retain their next() value
820  // in order to account for multiple removes. If all nodes were
821  // removed from the last call then eventually we reach next() == NULL
822  // which is the old tail.
823  while(s != NULL && s->isUnlinked()) {
824  s = s->next;
825  }
826 
827  return s;
828  }
829 
830  };
831 
832  public:
833 
835  return new LinkedIterator(this);
836  }
837 
839  return new ConstLinkedIterator(this);
840  }
841 
842  private:
843 
844  void unlink(Pointer< QueueNode<E> >& p, Pointer< QueueNode<E> >& predicessor) {
845 
846  // In order to prevent Iterators from losing their ability to provide
847  // weakly consistent iteration the next value of p is left intact but
848  // the node is marked as unlinked and it value is reset to default.
849  p->unlink();
850 
851  predicessor->next = p->next;
852 
853  if(this->tail == p) {
854  this->tail = predicessor;
855  }
856 
857  if(this->count.getAndDecrement() == capacity) {
858  this->signalNotFull();
859  }
860  }
861 
862  void signalNotEmpty() {
863  this->takeLock.lock();
864  try {
865  this->notEmpty->signal();
866  } catch(decaf::lang::Exception& ex) {
867  this->takeLock.unlock();
868  throw;
869  }
870  this->takeLock.unlock();
871  }
872 
873  void signalNotFull() {
874  this->putLock.lock();
875  try {
876  this->notFull->signal();
877  } catch(decaf::lang::Exception& ex) {
878  this->putLock.unlock();
879  throw;
880  }
881  this->putLock.unlock();
882  }
883 
884  // Must be called with the putLock locked.
885  void enqueue(E value) {
886  Pointer< QueueNode<E> > newTail( new QueueNode<E>(value) );
887  this->tail->next = newTail;
888  this->tail = newTail;
889  }
890 
891  // Must be called with the takeLock locked.
892  E dequeue() {
893  Pointer< QueueNode<E> > temp = this->head;
894  Pointer< QueueNode<E> > newHead = temp->next;
895  this->head = newHead;
896 
897  return newHead->getAndDequeue();
898  }
899 
900  void purgeList() {
901  Pointer< QueueNode<E> > current = this->head->next;
902  Pointer< QueueNode<E> > temp;
903  while(current != NULL) {
904  temp = current;
905  current = current->next;
906  temp->next.reset(NULL);
907  temp.reset(NULL);
908  }
909  }
910  };
911 
912 }}}
913 
914 #endif /* _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ */
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition: Pointer.h:161
The root interface in the collection hierarchy.
Definition: Collection.h:68
virtual bool peek(E &result) const
Gets but not removes the element in the head of the queue.
Definition: LinkedBlockingQueue.h:534
int getAndAdd(int delta)
Atomically adds the given value to the current value.
#define DECAF_CATCH_RETHROW(type)
Macro for catching and rethrowing an exception of a given type.
Definition: ExceptionDefines.h:27
virtual void lock()
Locks the object.
Definition: AbstractCollection.h:330
long long toNanos(long long duration) const
Equivalent to NANOSECONDS.convert(duration, this).
Definition: TimeUnit.h:126
virtual bool offer(const E &value, long long timeout, const TimeUnit &unit)
Inserts the specified element into this queue, waiting up to the specified wait time if necessary for...
Definition: LinkedBlockingQueue.h:355
A decaf::util::Queue that additionally supports operations that wait for the queue to become non-empt...
Definition: BlockingQueue.h:164
virtual void lockInterruptibly()
Acquires the lock unless the current thread is interrupted.
LinkedBlockingQueue< E > & operator=(const LinkedBlockingQueue< E > &queue)
Definition: LinkedBlockingQueue.h:279
int get() const
Gets the current value.
Definition: AtomicInteger.h:66
virtual void lock()
Acquires the lock.
#define NULL
Definition: Config.h:33
virtual decaf::util::Iterator< E > * iterator()
Definition: LinkedBlockingQueue.h:834
virtual bool poll(E &result, long long timeout, const TimeUnit &unit)
Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for ...
Definition: LinkedBlockingQueue.h:465
virtual void unlock()
Attempts to release this lock.
virtual bool offer(const E &value)
Inserts the specified element into the queue provided that the condition allows such an operation...
Definition: LinkedBlockingQueue.h:392
virtual int size() const
Returns the number of elements in this collection.
Definition: LinkedBlockingQueue.h:293
int getAndDecrement()
Atomically decrements by one the current value.
virtual bool addAll(const Collection< E > &collection)
Adds all of the elements in the specified collection to this collection.The behavior of this operatio...
Definition: AbstractQueue.h:78
virtual E take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available...
Definition: LinkedBlockingQueue.h:426
Defines an object that can be used to iterate over the elements of a collection.
Definition: Iterator.h:34
int getAndIncrement()
Atomically increments by one the current value.
A BlockingQueue derivative that allows for a bound to be placed on the number of elements that can be...
Definition: LinkedBlockingQueue.h:52
Definition: UnsupportedOperationException.h:32
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition: TimeUnit.h:62
An int value that may be updated atomically.
Definition: AtomicInteger.h:37
void set(int newValue)
Sets to the given value.
Definition: AtomicInteger.h:74
virtual bool add(const E &value)=0
Returns true if this collection changed as a result of the call.
virtual std::vector< E > toArray() const
Answers an STL vector containing copies of all elements contained in this Collection.
Definition: LinkedBlockingQueue.h:576
LinkedBlockingQueue(int capacity)
Create a new instance with the given initial capacity value.
Definition: LinkedBlockingQueue.h:175
virtual bool poll(E &result)
Gets and removes the element in the head of the queue.
Definition: LinkedBlockingQueue.h:501
virtual int drainTo(Collection< E > &sink, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given col...
Definition: LinkedBlockingQueue.h:600
virtual decaf::util::Iterator< E > * iterator()=0
virtual int remainingCapacity() const
Returns the number of additional elements that this queue can ideally (in the absence of memory or re...
Definition: LinkedBlockingQueue.h:310
Definition: IllegalArgumentException.h:31
virtual int drainTo(Collection< E > &c)
Removes all available elements from this queue and adds them to the given collection.
Definition: LinkedBlockingQueue.h:596
virtual Condition * newCondition()
Returns a Condition instance for use with this Lock instance.
virtual ~LinkedBlockingQueue()
Definition: LinkedBlockingQueue.h:271
Definition: IllegalStateException.h:32
A reentrant mutual exclusion Lock with extended capabilities.
Definition: ReentrantLock.h:80
static short min(short a, short b)
Returns the double value that is closest in value to the argument and is equal to a mathematical inte...
Definition: Math.h:346
std::string toString() const
int getAndSet(int newValue)
Atomically sets to the given value and returns the old value.
PointerType get() const
Gets the real pointer that is contained within this Pointer.
Definition: Pointer.h:188
#define DECAF_CATCHALL_THROW(type)
A catch-all that throws a known exception.
Definition: ExceptionDefines.h:50
Definition: NoSuchElementException.h:31
LinkedBlockingQueue(const Collection< E > &collection)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition: LinkedBlockingQueue.h:197
LinkedBlockingQueue(const LinkedBlockingQueue &queue)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition: LinkedBlockingQueue.h:239
virtual decaf::util::Iterator< E > * iterator() const
Definition: LinkedBlockingQueue.h:838
LinkedBlockingQueue()
Create a new instance with a Capacity of Integer::MAX_VALUE.
Definition: LinkedBlockingQueue.h:159
Definition: Exception.h:38
virtual void clear()
Removes all of the elements from this collection (optional operation).The collection will be empty af...
Definition: LinkedBlockingQueue.h:297
static const int MAX_VALUE
The maximum value that the primitive type can hold.
Definition: Integer.h:44
Decaf&#39;s implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
virtual void put(const E &value)
Inserts the specified element into this queue, waiting if necessary for space to become available...
Definition: LinkedBlockingQueue.h:314
void swap(Pointer &value)
Exception Safe Swap Function.
Definition: Pointer.h:198
This class provides skeletal implementations of some Queue operations.
Definition: AbstractQueue.h:47
virtual std::string toString() const
Definition: LinkedBlockingQueue.h:591
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: AprPool.h:25