18 #ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ 19 #define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ 37 namespace concurrent {
55 template<
typename U >
69 QueueNode(
const QueueNode&);
74 QueueNode() : value(), unlinked(
false), dequeued(
false), next() {}
75 QueueNode(
const U& value) : value(value), unlinked(
false), dequeued(
false), next() {}
80 this->unlinked =
false;
81 this->dequeued =
false;
89 E result = this->value;
91 this->dequeued =
true;
98 this->unlinked =
true;
101 bool isUnlinked()
const {
102 return this->unlinked;
105 bool isDequeued()
const {
106 return this->dequeued;
113 TotalLock(
const TotalLock& src);
114 TotalLock&
operator=(
const TotalLock& src);
123 parent->putLock.lock();
124 parent->takeLock.lock();
128 parent->putLock.unlock();
129 parent->takeLock.unlock();
160 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
162 this->tail = this->head;
176 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
179 __FILE__, __LINE__,
"Capacity value must be greater than zero.");
182 this->tail = this->head;
198 capacity(lang::Integer::MAX_VALUE), count(),
199 takeLock(), notEmpty(), putLock(), notFull(),
200 head(new QueueNode<E>()), tail() {
202 this->tail = this->head;
212 while(iter->hasNext()) {
213 if(count == this->capacity) {
215 "Number of elements in the Collection exceeds this Queue's Capacity.");
218 this->enqueue(iter->next());
222 this->count.set(count);
240 capacity(lang::Integer::MAX_VALUE), count(),
241 takeLock(), notEmpty(), putLock(), notFull(),
242 head(new QueueNode<E>()), tail() {
244 this->tail = this->head;
254 while(iter->hasNext()) {
255 if(count == this->capacity) {
257 "Number of elements in the Collection exceeds this Queue's Capacity.");
260 this->enqueue(iter->next());
264 this->count.set(count);
294 return this->count.
get();
299 TotalLock
lock(
this);
302 this->tail = this->head;
305 if(this->count.
getAndSet(0) == this->capacity) {
306 this->notFull->signal();
311 return this->capacity - this->count.
get();
314 virtual void put(
const E& value ) {
326 while (this->count.
get() == this->capacity) {
327 this->notFull->await();
337 if(c + 1 < this->capacity) {
338 this->notFull->signal();
351 this->signalNotEmpty();
355 virtual bool offer(
const E& value,
long long timeout,
const TimeUnit& unit ) {
358 long long nanos = unit.
toNanos(timeout);
363 while(this->count.
get() == this->capacity) {
368 nanos = this->notFull->awaitNanos(nanos);
374 if(c + 1 < this->capacity) {
375 this->notFull->signal();
386 this->signalNotEmpty();
392 virtual bool offer(
const E& value) {
394 if (this->count.
get() == this->capacity) {
402 if (this->count.
get() < this->capacity) {
407 if (c + 1 < this->capacity) {
408 this->notFull->signal();
420 this->signalNotEmpty();
434 while (this->count.
get() == 0) {
435 this->notEmpty->await();
445 this->notEmpty->signal();
458 if (c == this->capacity) {
459 this->signalNotFull();
465 virtual bool poll(E& result,
long long timeout,
const TimeUnit& unit) {
467 long long nanos = unit.
toNanos(timeout);
472 while (this->count.
get() == 0) {
477 nanos = this->notEmpty->awaitNanos(nanos);
484 this->notEmpty->signal();
494 if(c == this->capacity) {
495 this->signalNotFull();
503 if (this->count.
get() == 0) {
508 this->takeLock.
lock();
511 if (this->count.
get() > 0) {
516 this->notEmpty->signal();
527 if (c == this->capacity) {
528 this->signalNotFull();
534 virtual bool peek(E& result)
const {
536 if(this->count.
get() == 0) {
540 this->takeLock.
lock();
546 result = front->
get();
560 virtual bool remove(
const E& value) {
562 TotalLock
lock(
this);
564 for(
Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p !=
NULL;
565 predicessor = p, p = p->next) {
567 if(value == p->get()) {
568 unlink(p, predicessor);
578 TotalLock
lock(
this);
581 std::vector<E> array;
584 for(
Pointer< QueueNode<E> > p = this->head->next; p !=
NULL; p = p->next) {
585 array.push_back(p->get());
592 return std::string(
"LinkedBlockingQueue [ current size = ") +
604 "Cannot drain this Collection to itself.");
607 bool signalNotFull =
false;
608 bool shouldThrow =
false;
612 this->takeLock.
lock();
625 sink.
add( p->getAndDequeue() );
637 signalNotFull = (this->count.
getAndAdd(-i) == this->capacity);
648 this->signalNotFull();
660 class LinkedIterator :
public Iterator<E> {
670 LinkedIterator(
const LinkedIterator&);
671 LinkedIterator&
operator= (
const LinkedIterator&);
676 currentElement(), parent(parent) {
677 TotalLock
lock(parent);
679 this->current = parent->head->next;
680 if(this->current !=
NULL) {
681 this->currentElement = current->
get();
685 virtual bool hasNext()
const {
686 return this->current !=
NULL;
691 TotalLock
lock(this->parent);
693 if(this->current ==
NULL) {
695 "Iterator next called with no matching next element.");
698 E result = this->currentElement;
699 this->last = this->current;
700 this->current = this->nextNode(this->current);
701 this->currentElement = (this->current ==
NULL) ? E() : this->current->
get();
706 virtual void remove() {
708 if(this->last ==
NULL) {
710 "Iterator remove called without having called next().");
713 TotalLock
lock(this->parent);
716 node.
swap(this->last);
718 for(
Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p !=
NULL;
719 trail = p, p = p->next) {
722 this->parent->unlink(p, trail);
734 if(p->isDequeued()) {
735 return this->parent->head->next;
745 while(s !=
NULL && s->isUnlinked()) {
754 class ConstLinkedIterator :
public Iterator<E> {
764 ConstLinkedIterator(
const ConstLinkedIterator&);
765 ConstLinkedIterator&
operator= (
const ConstLinkedIterator&);
772 TotalLock
lock(parent);
774 this->current = parent->head->next;
775 if(this->current !=
NULL) {
776 this->currentElement = current->
get();
780 virtual bool hasNext()
const {
781 return this->current !=
NULL;
786 TotalLock
lock(this->parent);
788 if(this->current ==
NULL) {
790 "Iterator next called with no matching next element.");
793 E result = this->currentElement;
794 this->last = this->current;
795 this->current = this->nextNode(this->current);
796 this->currentElement = (this->current ==
NULL) ? E() : this->current->
get();
801 virtual void remove() {
803 __FILE__, __LINE__,
"Cannot write to a const ListIterator." );
812 if(p->isDequeued()) {
813 return this->parent->head->next;
823 while(s !=
NULL && s->isUnlinked()) {
835 return new LinkedIterator(
this);
839 return new ConstLinkedIterator(
this);
844 void unlink(
Pointer< QueueNode<E> >& p,
Pointer< QueueNode<E> >& predicessor) {
851 predicessor->next = p->next;
853 if(this->tail == p) {
854 this->tail = predicessor;
858 this->signalNotFull();
862 void signalNotEmpty() {
863 this->takeLock.
lock();
865 this->notEmpty->signal();
873 void signalNotFull() {
874 this->putLock.
lock();
876 this->notFull->signal();
885 void enqueue(E value) {
887 this->tail->next = newTail;
888 this->tail = newTail;
895 this->head = newHead;
897 return newHead->getAndDequeue();
903 while(current !=
NULL) {
905 current = current->next;
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition: Pointer.h:161
The root interface in the collection hierarchy.
Definition: Collection.h:68
virtual bool peek(E &result) const
Gets but not removes the element in the head of the queue.
Definition: LinkedBlockingQueue.h:534
int getAndAdd(int delta)
Atomically adds the given value to the current value.
#define DECAF_CATCH_RETHROW(type)
Macro for catching and rethrowing an exception of a given type.
Definition: ExceptionDefines.h:27
virtual void lock()
Locks the object.
Definition: AbstractCollection.h:330
long long toNanos(long long duration) const
Equivalent to NANOSECONDS.convert(duration, this).
Definition: TimeUnit.h:126
virtual bool offer(const E &value, long long timeout, const TimeUnit &unit)
Inserts the specified element into this queue, waiting up to the specified wait time if necessary for...
Definition: LinkedBlockingQueue.h:355
A decaf::util::Queue that additionally supports operations that wait for the queue to become non-empt...
Definition: BlockingQueue.h:164
virtual void lockInterruptibly()
Acquires the lock unless the current thread is interrupted.
LinkedBlockingQueue< E > & operator=(const LinkedBlockingQueue< E > &queue)
Definition: LinkedBlockingQueue.h:279
int get() const
Gets the current value.
Definition: AtomicInteger.h:66
virtual void lock()
Acquires the lock.
#define NULL
Definition: Config.h:33
virtual decaf::util::Iterator< E > * iterator()
Definition: LinkedBlockingQueue.h:834
virtual bool poll(E &result, long long timeout, const TimeUnit &unit)
Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for ...
Definition: LinkedBlockingQueue.h:465
virtual void unlock()
Attempts to release this lock.
virtual bool offer(const E &value)
Inserts the specified element into the queue provided that the condition allows such an operation...
Definition: LinkedBlockingQueue.h:392
virtual int size() const
Returns the number of elements in this collection.
Definition: LinkedBlockingQueue.h:293
int getAndDecrement()
Atomically decrements by one the current value.
virtual bool addAll(const Collection< E > &collection)
Adds all of the elements in the specified collection to this collection.The behavior of this operatio...
Definition: AbstractQueue.h:78
virtual E take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available...
Definition: LinkedBlockingQueue.h:426
Defines an object that can be used to iterate over the elements of a collection.
Definition: Iterator.h:34
int getAndIncrement()
Atomically increments by one the current value.
A BlockingQueue derivative that allows for a bound to be placed on the number of elements that can be...
Definition: LinkedBlockingQueue.h:52
Definition: UnsupportedOperationException.h:32
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition: TimeUnit.h:62
An int value that may be updated atomically.
Definition: AtomicInteger.h:37
void set(int newValue)
Sets to the given value.
Definition: AtomicInteger.h:74
virtual bool add(const E &value)=0
Returns true if this collection changed as a result of the call.
virtual std::vector< E > toArray() const
Answers an STL vector containing copies of all elements contained in this Collection.
Definition: LinkedBlockingQueue.h:576
LinkedBlockingQueue(int capacity)
Create a new instance with the given initial capacity value.
Definition: LinkedBlockingQueue.h:175
virtual bool poll(E &result)
Gets and removes the element in the head of the queue.
Definition: LinkedBlockingQueue.h:501
virtual int drainTo(Collection< E > &sink, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given col...
Definition: LinkedBlockingQueue.h:600
virtual decaf::util::Iterator< E > * iterator()=0
virtual int remainingCapacity() const
Returns the number of additional elements that this queue can ideally (in the absence of memory or re...
Definition: LinkedBlockingQueue.h:310
Definition: IllegalArgumentException.h:31
virtual int drainTo(Collection< E > &c)
Removes all available elements from this queue and adds them to the given collection.
Definition: LinkedBlockingQueue.h:596
virtual Condition * newCondition()
Returns a Condition instance for use with this Lock instance.
virtual ~LinkedBlockingQueue()
Definition: LinkedBlockingQueue.h:271
Definition: IllegalStateException.h:32
A reentrant mutual exclusion Lock with extended capabilities.
Definition: ReentrantLock.h:80
static short min(short a, short b)
Returns the double value that is closest in value to the argument and is equal to a mathematical inte...
Definition: Math.h:346
std::string toString() const
int getAndSet(int newValue)
Atomically sets to the given value and returns the old value.
PointerType get() const
Gets the real pointer that is contained within this Pointer.
Definition: Pointer.h:188
#define DECAF_CATCHALL_THROW(type)
A catch-all that throws a known exception.
Definition: ExceptionDefines.h:50
Definition: NoSuchElementException.h:31
LinkedBlockingQueue(const Collection< E > &collection)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition: LinkedBlockingQueue.h:197
LinkedBlockingQueue(const LinkedBlockingQueue &queue)
Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the values contained in the ...
Definition: LinkedBlockingQueue.h:239
virtual decaf::util::Iterator< E > * iterator() const
Definition: LinkedBlockingQueue.h:838
LinkedBlockingQueue()
Create a new instance with a Capacity of Integer::MAX_VALUE.
Definition: LinkedBlockingQueue.h:159
Definition: Exception.h:38
virtual void clear()
Removes all of the elements from this collection (optional operation).The collection will be empty af...
Definition: LinkedBlockingQueue.h:297
static const int MAX_VALUE
The maximum value that the primitive type can hold.
Definition: Integer.h:44
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
virtual void put(const E &value)
Inserts the specified element into this queue, waiting if necessary for space to become available...
Definition: LinkedBlockingQueue.h:314
void swap(Pointer &value)
Exception Safe Swap Function.
Definition: Pointer.h:198
This class provides skeletal implementations of some Queue operations.
Definition: AbstractQueue.h:47
virtual std::string toString() const
Definition: LinkedBlockingQueue.h:591
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: AprPool.h:25