18 #ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
19 #define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
37 namespace concurrent {
55 template<
typename U >
69 QueueNode(
const QueueNode&);
74 QueueNode() : value(), unlinked(
false), dequeued(
false), next() {}
75 QueueNode(
const U& value) : value(value), unlinked(
false), dequeued(
false), next() {}
77 void set(
Pointer< QueueNode<U> > next,
const U& value) {
80 this->unlinked =
false;
81 this->dequeued =
false;
89 E result = this->value;
91 this->dequeued =
true;
98 this->unlinked =
true;
101 bool isUnlinked()
const {
102 return this->unlinked;
105 bool isDequeued()
const {
106 return this->dequeued;
113 TotalLock(
const TotalLock& src);
114 TotalLock&
operator=(
const TotalLock& src);
123 parent->putLock.lock();
124 parent->takeLock.lock();
128 parent->putLock.unlock();
129 parent->takeLock.unlock();
160 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
162 this->tail = this->head;
176 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
179 __FILE__, __LINE__,
"Capacity value must be greater than zero.");
182 this->tail = this->head;
198 capacity(lang::Integer::MAX_VALUE), count(),
199 takeLock(), notEmpty(), putLock(), notFull(),
200 head(new QueueNode<E>()), tail() {
202 this->tail = this->head;
212 while(iter->hasNext()) {
213 if(count == this->capacity) {
215 "Number of elements in the Collection exceeds this Queue's Capacity.");
218 this->enqueue(iter->next());
222 this->count.set(count);
240 capacity(lang::Integer::MAX_VALUE), count(),
241 takeLock(), notEmpty(), putLock(), notFull(),
242 head(new QueueNode<E>()), tail() {
244 this->tail = this->head;
254 while(iter->hasNext()) {
255 if(count == this->capacity) {
257 "Number of elements in the Collection exceeds this Queue's Capacity.");
260 this->enqueue(iter->next());
264 this->count.set(count);
294 return this->count.
get();
299 TotalLock
lock(
this);
302 this->tail = this->head;
305 if(this->count.
getAndSet(0) == this->capacity) {
306 this->notFull->signal();
311 return this->capacity - this->count.
get();
314 virtual void put(
const E& value ) {
326 while (this->count.
get() == this->capacity) {
327 this->notFull->await();
337 if(c + 1 < this->capacity) {
338 this->notFull->signal();
351 this->signalNotEmpty();
355 virtual bool offer(
const E& value,
long long timeout,
const TimeUnit& unit ) {
358 long long nanos = unit.
toNanos(timeout);
363 while(this->count.
get() == this->capacity) {
368 nanos = this->notFull->awaitNanos(nanos);
374 if(c + 1 < this->capacity) {
375 this->notFull->signal();
386 this->signalNotEmpty();
392 virtual bool offer(
const E& value) {
394 if (this->count.
get() == this->capacity) {
402 if (this->count.
get() < this->capacity) {
407 if (c + 1 < this->capacity) {
408 this->notFull->signal();
420 this->signalNotEmpty();
434 while (this->count.
get() == 0) {
435 this->notEmpty->await();
445 this->notEmpty->signal();
458 if (c == this->capacity) {
459 this->signalNotFull();
465 virtual bool poll(E& result,
long long timeout,
const TimeUnit& unit) {
467 long long nanos = unit.
toNanos(timeout);
472 while (this->count.
get() == 0) {
477 nanos = this->notEmpty->awaitNanos(nanos);
484 this->notEmpty->signal();
494 if(c == this->capacity) {
495 this->signalNotFull();
503 if (this->count.
get() == 0) {
508 this->takeLock.
lock();
511 if (this->count.
get() > 0) {
516 this->notEmpty->signal();
527 if (c == this->capacity) {
528 this->signalNotFull();
534 virtual bool peek(E& result)
const {
536 if(this->count.
get() == 0) {
540 this->takeLock.
lock();
546 result = front->
get();
560 virtual bool remove(
const E& value) {
562 TotalLock
lock(
this);
564 for(
Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p !=
NULL;
565 predicessor = p, p = p->next) {
567 if(value == p->get()) {
568 unlink(p, predicessor);
578 TotalLock
lock(
this);
581 std::vector<E> array;
584 for(
Pointer< QueueNode<E> > p = this->head->next; p !=
NULL; p = p->next) {
585 array.push_back(p->get());
592 return std::string(
"LinkedBlockingQueue [ current size = ") +
604 "Cannot drain this Collection to itself.");
607 bool signalNotFull =
false;
608 bool shouldThrow =
false;
612 this->takeLock.
lock();
625 sink.
add( p->getAndDequeue() );
637 signalNotFull = (this->count.
getAndAdd(-i) == this->capacity);
648 this->signalNotFull();
660 class LinkedIterator :
public Iterator<E> {
670 LinkedIterator(
const LinkedIterator&);
671 LinkedIterator&
operator= (
const LinkedIterator&);
676 currentElement(), parent(parent) {
677 TotalLock
lock(parent);
679 this->current = parent->head->next;
680 if(this->current !=
NULL) {
681 this->currentElement = current->
get();
685 virtual bool hasNext()
const {
686 return this->current !=
NULL;
691 TotalLock
lock(this->parent);
693 if(this->current ==
NULL) {
695 "Iterator next called with no matching next element.");
698 E result = this->currentElement;
699 this->last = this->current;
700 this->current = this->nextNode(this->current);
701 this->currentElement = (this->current ==
NULL) ? E() : this->current->
get();
706 virtual void remove() {
708 if(this->last ==
NULL) {
710 "Iterator remove called without having called next().");
713 TotalLock
lock(this->parent);
715 Pointer< QueueNode<E> > node;
716 node.swap(this->last);
718 for(Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p !=
NULL;
719 trail = p, p = p->next) {
722 this->parent->unlink(p, trail);
730 Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
734 if(p->isDequeued()) {
735 return this->parent->head->next;
738 Pointer< QueueNode<E> > s = p->next;
745 while(s !=
NULL && s->isUnlinked()) {
754 class ConstLinkedIterator :
public Iterator<E> {
757 Pointer< QueueNode<E> > current;
758 Pointer< QueueNode<E> > last;
760 const LinkedBlockingQueue<E>* parent;
764 ConstLinkedIterator(
const ConstLinkedIterator&);
765 ConstLinkedIterator&
operator= (
const ConstLinkedIterator&);
769 ConstLinkedIterator(
const LinkedBlockingQueue<E>* parent) : current(), last(),
772 TotalLock
lock(parent);
774 this->current = parent->head->next;
775 if(this->current !=
NULL) {
776 this->currentElement = current->get();
780 virtual bool hasNext()
const {
781 return this->current !=
NULL;
786 TotalLock
lock(this->parent);
788 if(this->current ==
NULL) {
790 "Iterator next called with no matching next element.");
793 E result = this->currentElement;
794 this->last = this->current;
795 this->current = this->nextNode(this->current);
796 this->currentElement = (this->current ==
NULL) ? E() : this->current->get();
801 virtual void remove() {
802 throw lang::exceptions::UnsupportedOperationException(
803 __FILE__, __LINE__,
"Cannot write to a const ListIterator." );
808 Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
812 if(p->isDequeued()) {
813 return this->parent->head->next;
816 Pointer< QueueNode<E> > s = p->next;
823 while(s !=
NULL && s->isUnlinked()) {
835 return new LinkedIterator(
this);
839 return new ConstLinkedIterator(
this);
844 void unlink(
Pointer< QueueNode<E> >& p,
Pointer< QueueNode<E> >& predicessor) {
851 predicessor->next = p->next;
853 if(this->tail == p) {
854 this->tail = predicessor;
858 this->signalNotFull();
862 void signalNotEmpty() {
863 this->takeLock.
lock();
865 this->notEmpty->signal();
873 void signalNotFull() {
874 this->putLock.
lock();
876 this->notFull->signal();
885 void enqueue(E value) {
886 Pointer< QueueNode<E> > newTail(
new QueueNode<E>(value) );
887 this->tail->next = newTail;
888 this->tail = newTail;
893 Pointer< QueueNode<E> > temp = this->head;
894 Pointer< QueueNode<E> > newHead = temp->next;
895 this->head = newHead;
897 return newHead->getAndDequeue();
901 Pointer< QueueNode<E> > current = this->head->next;
902 Pointer< QueueNode<E> > temp;
903 while(current !=
NULL) {
905 current = current->next;
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition: Pointer.h:161
virtual int size() const
Returns the number of elements in this collection.
Definition: LinkedBlockingQueue.h:293
The root interface in the collection hierarchy.
Definition: Collection.h:68
int getAndAdd(int delta)
Atomically adds the given value to the current value.
#define DECAF_CATCH_RETHROW(type)
Macro for catching and rethrowing an exception of a given type.
Definition: ExceptionDefines.h:27
virtual std::vector< E > toArray() const
Answers an STL vector containing copies of all elements contained in this Collection.
Definition: LinkedBlockingQueue.h:576
virtual void lock()
Locks the object.
Definition: AbstractCollection.h:330
virtual bool offer(const E &value, long long timeout, const TimeUnit &unit)
Inserts the specified element into this queue, waiting up to the specified wait time if necessary for...
Definition: LinkedBlockingQueue.h:355
A decaf::util::Queue that additionally supports operations that wait for the queue to become non-empt...
Definition: BlockingQueue.h:164
virtual void lockInterruptibly()
Acquires the lock unless the current thread is interrupted.
LinkedBlockingQueue< E > & operator=(const LinkedBlockingQueue< E > &queue)
Definition: LinkedBlockingQueue.h:279
virtual void lock()
Acquires the lock.
#define NULL
Definition: Config.h:33
virtual decaf::util::Iterator< E > * iterator()
Definition: LinkedBlockingQueue.h:834
virtual bool poll(E &result, long long timeout, const TimeUnit &unit)
Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for ...
Definition: LinkedBlockingQueue.h:465
virtual int remainingCapacity() const
Returns the number of additional elements that this queue can ideally (in the absence of memory or re...
Definition: LinkedBlockingQueue.h:310
virtual void unlock()
Attempts to release this lock.
virtual bool offer(const E &value)
Inserts the specified element into the queue provided that the condition allows such an operation...
Definition: LinkedBlockingQueue.h:392
virtual bool peek(E &result) const
Gets but not removes the element in the head of the queue.
Definition: LinkedBlockingQueue.h:534
int getAndDecrement()
Atomically decrements by one the current value.
virtual bool addAll(const Collection< E > &collection)
Adds all of the elements in the specified collection to this collection.The behavior of this operatio...
Definition: AbstractQueue.h:78
virtual E take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available...
Definition: LinkedBlockingQueue.h:426
Defines an object that can be used to iterate over the elements of a collection.
Definition: Iterator.h:34
virtual std::string toString() const
Definition: LinkedBlockingQueue.h:591
int getAndIncrement()
Atomically increments by one the current value.
A BlockingQueue derivative that allows for a bound to be placed on the number of elements that can be...
Definition: LinkedBlockingQueue.h:52
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition: TimeUnit.h:62
virtual decaf::util::Iterator< E > * iterator() const
Definition: LinkedBlockingQueue.h:838
An int value that may be updated atomically.
Definition: AtomicInteger.h:37
void set(int newValue)
Sets to the given value.
Definition: AtomicInteger.h:74
virtual bool add(const E &value)=0
Returns true if this collection changed as a result of the call.
LinkedBlockingQueue(int capacity)
Create a new instance with the given initial capacity value.
Definition: LinkedBlockingQueue.h:175
virtual bool poll(E &result)
Gets and removes the element in the head of the queue.
Definition: LinkedBlockingQueue.h:501
virtual int drainTo(Collection< E > &sink, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given col...
Definition: LinkedBlockingQueue.h:600
virtual decaf::util::Iterator< E > * iterator()=0
Definition: IllegalArgumentException.h:31
virtual int drainTo(Collection< E > &c)
Removes all available elements from this queue and adds them to the given collection.
Definition: LinkedBlockingQueue.h:596
PointerType get() const
Gets the real pointer that is contained within this Pointer.
Definition: Pointer.h:188
std::string toString() const
virtual Condition * newCondition()
Returns a Condition instance for use with this Lock instance.
virtual ~LinkedBlockingQueue()
Definition: LinkedBlockingQueue.h:271
Definition: IllegalStateException.h:32
A reentrant mutual exclusion Lock with extended capabilities.
Definition: ReentrantLock.h:80
static short min(short a, short b)
Returns the double value that is closest in value to the argument and is equal to a mathematical inte...
Definition: Math.h:346
int getAndSet(int newValue)
Atomically sets to the given value and returns the old value.
#define DECAF_CATCHALL_THROW(type)
A catch-all that throws a known exception.
Definition: ExceptionDefines.h:50
Definition: NoSuchElementException.h:31
LinkedBlockingQueue(const Collection< E > &collection)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition: LinkedBlockingQueue.h:197
LinkedBlockingQueue(const LinkedBlockingQueue &queue)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition: LinkedBlockingQueue.h:239
LinkedBlockingQueue()
Create a new instance with a Capacity of Integer::MAX_VALUE.
Definition: LinkedBlockingQueue.h:159
Definition: Exception.h:38
long long toNanos(long long duration) const
Equivalent to NANOSECONDS.convert(duration, this).
Definition: TimeUnit.h:126
int get() const
Gets the current value.
Definition: AtomicInteger.h:66
virtual void clear()
Removes all of the elements from this collection (optional operation).The collection will be empty af...
Definition: LinkedBlockingQueue.h:297
static const int MAX_VALUE
The maximum value that the primitive type can hold.
Definition: Integer.h:44
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
virtual void put(const E &value)
Inserts the specified element into this queue, waiting if necessary for space to become available...
Definition: LinkedBlockingQueue.h:314
This class provides skeletal implementations of some Queue operations.
Definition: AbstractQueue.h:47
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: AprPool.h:25