activemq-cpp-3.8.2
FutureTask.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
19 #define _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
20 
21 #include <decaf/util/Config.h>
22 
23 #include <decaf/lang/Thread.h>
24 #include <decaf/lang/Pointer.h>
26 
34 
35 namespace decaf {
36 namespace util {
37 namespace concurrent {
38 
40 
57  template<typename T>
58  class FutureTask : public RunnableFuture<T> {
59  private:
60 
65  class FutureTaskAdapter : public decaf::util::concurrent::Callable<T> {
66  private:
67 
70  bool owns;
71  T result;
72 
73  private:
74 
75  FutureTaskAdapter(const FutureTaskAdapter&);
76  FutureTaskAdapter operator= (const FutureTaskAdapter&);
77 
78  public:
79 
80  FutureTaskAdapter(decaf::lang::Runnable* task, const T& result, bool owns = true) :
81  decaf::util::concurrent::Callable<T>(), task(task), callable(NULL), owns(owns), result(result) {
82  }
83 
84  FutureTaskAdapter(decaf::util::concurrent::Callable<T>* task, bool owns = true) :
85  decaf::util::concurrent::Callable<T>(), task(NULL), callable(task), owns(owns), result(T()) {
86  }
87 
88  virtual ~FutureTaskAdapter() {
89  try{
90  if (owns) {
91  delete this->task;
92  delete this->callable;
93  }
94  }
96  }
97 
98  virtual T call() {
99  if (this->task != NULL) {
100  this->task->run();
101  return result;
102  } else {
103  return this->callable->call();
104  }
105  }
106  };
107 
113  class FutureTaskSync : public locks::AbstractQueuedSynchronizer {
114  private:
115 
116  enum SyncState {
118  READY = 0,
120  RUNNING = 1,
122  RAN = 2,
124  CANCELLED = 4
125  };
126 
128  Pointer< Callable<T> > callable;
129 
131  T result;
132 
135 
136  // The FutureTask parent of the Sync object.
137  FutureTask* parent;
138 
143  decaf::lang::Thread* runner;
144 
145  private:
146 
147  FutureTaskSync(const FutureTaskSync&);
148  FutureTaskSync operator= (const FutureTaskSync&);
149 
150  public:
151 
152  FutureTaskSync(FutureTask* parent, Callable<T>* callable) :
153  AbstractQueuedSynchronizer(), callable(callable), result(), exception(), parent(parent), runner(NULL) {
154  }
155 
156  virtual ~FutureTaskSync() {
157  }
158 
159  bool innerIsCancelled() const {
160  return getState() == CANCELLED;
161  }
162 
163  bool innerIsDone() const {
164  return ranOrCancelled(getState()) && this->runner == NULL;
165  }
166 
167  T innerGet() {
168  this->acquireSharedInterruptibly(0);
169  if (getState() == CANCELLED) {
170  throw CancellationException();
171  }
172  if (exception != NULL) {
173  throw ExecutionException(exception->clone());
174  }
175  return result;
176  }
177 
178  T innerGet(long long nanosTimeout) {
179  if (!tryAcquireSharedNanos(0, nanosTimeout)) {
180  throw TimeoutException();
181  }
182  if (getState() == CANCELLED) {
183  throw CancellationException();
184  }
185  if (exception != NULL) {
186  throw ExecutionException(exception->clone());
187  }
188  return result;
189  }
190 
191  void innerSet(const T& result) {
192  for (;;) {
193  int s = getState();
194  if (s == RAN) {
195  return;
196  }
197  if (s == CANCELLED) {
198  // aggressively release to set runner to null,
199  // in case we are racing with a cancel request
200  // that will try to interrupt runner
201  releaseShared(0);
202  return;
203  }
204  if (compareAndSetState(s, RAN)) {
205  this->result = result;
206  releaseShared(0);
207  this->parent->done();
208  return;
209  }
210  }
211  }
212 
213  void innerSetException(const decaf::lang::Exception& t) {
214  for (;;) {
215  int s = getState();
216  if (s == RAN) {
217  return;
218  }
219  if (s == CANCELLED) {
220  // aggressively release to set runner to null,
221  // in case we are racing with a cancel request
222  // that will try to interrupt runner
223  releaseShared(0);
224  return;
225  }
226  if (compareAndSetState(s, RAN)) {
227  exception.reset(t.clone());
228  releaseShared(0);
229  this->parent->done();
230  return;
231  }
232  }
233  }
234 
235  bool innerCancel(bool mayInterruptIfRunning) {
236  for (;;) {
237  int s = getState();
238  if (ranOrCancelled(s)) {
239  return false;
240  }
241  if (compareAndSetState(s, CANCELLED)) {
242  break;
243  }
244  }
245 
246  if (mayInterruptIfRunning) {
247  decaf::lang::Thread* r = runner;
248  if (r != NULL) {
249  r->interrupt();
250  }
251  }
252 
253  releaseShared(0);
254  this->parent->done();
255  return true;
256  }
257 
258  void innerRun() {
259  if (!compareAndSetState(READY, RUNNING)) {
260  return;
261  }
262 
263  this->runner = decaf::lang::Thread::currentThread();
264  if (getState() == RUNNING) { // recheck after setting thread
265  T result;
266  try {
267  result = this->callable->call();
268  } catch(decaf::lang::Exception& ex) {
269  this->parent->setException(ex);
270  return;
271  } catch(std::exception& stdex) {
272  this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
273  return;
274  } catch(...) {
275  this->parent->setException(decaf::lang::Exception(
276  __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
277  return;
278  }
279  this->parent->set(result);
280  } else {
281  releaseShared(0); // cancel
282  }
283  }
284 
285  bool innerRunAndReset() {
286  if (!compareAndSetState(READY, RUNNING)) {
287  return false;
288  }
289 
290  try {
291  this->runner = decaf::lang::Thread::currentThread();
292  if (getState() == RUNNING) {
293  this->callable->call(); // don't set result
294  }
295  this->runner = NULL;
296  return compareAndSetState(RUNNING, READY);
297  } catch(decaf::lang::Exception& ex) {
298  this->parent->setException(ex);
299  return false;
300  } catch(std::exception& stdex) {
301  this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
302  return false;
303  } catch(...) {
304  this->parent->setException(decaf::lang::Exception(
305  __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
306  return false;
307  }
308  }
309 
310  protected:
311 
315  virtual int tryAcquireShared(int ignore DECAF_UNUSED) {
316  return innerIsDone() ? 1 : -1;
317  }
318 
323  virtual bool tryReleaseShared(int ignore DECAF_UNUSED) {
324  runner = NULL;
325  return true;
326  }
327 
328  private:
329 
330  bool ranOrCancelled(int state) const {
331  return (state & (RAN | CANCELLED)) != 0;
332  }
333  };
334 
335  private:
336 
338 
339  public:
340 
352  FutureTask(Callable<T>* callable, bool takeOwnership = true) : sync(NULL) {
353  if (callable == NULL ) {
354  throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
355  "The Callable pointer passed to the constructor was NULL");
356  }
357 
358  this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(callable, takeOwnership)));
359  }
360 
375  FutureTask(decaf::lang::Runnable* runnable, const T& result, bool takeOwnership = true) : sync(NULL) {
376  if (runnable == NULL ) {
377  throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
378  "The Runnable pointer passed to the constructor was NULL");
379  }
380 
381  this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(runnable, result, takeOwnership)));
382  }
383 
384  virtual ~FutureTask() {
385  }
386 
387  virtual bool isCancelled() const {
388  return this->sync->innerIsCancelled();
389  }
390 
391  virtual bool isDone() const {
392  return this->sync->innerIsDone();
393  }
394 
395  virtual bool cancel(bool mayInterruptIfRunning) {
396  return this->sync->innerCancel(mayInterruptIfRunning);
397  }
398 
399  virtual T get() {
400  return this->sync->innerGet();
401  }
402 
403  virtual T get(long long timeout, const TimeUnit& unit) {
404  return this->sync->innerGet(unit.toNanos(timeout));
405  }
406 
408  return new FutureTask<T>(*this);
409  }
410 
411  public:
412 
421  virtual void done() {}
422 
432  virtual void set(const T& result) {
433  this->sync->innerSet(result);
434  }
435 
445  virtual void setException(const decaf::lang::Exception& error) {
446  this->sync->innerSetException(error);
447  }
448 
449  virtual void run() {
450  this->sync->innerRun();
451  }
452 
461  virtual bool runAndReset() {
462  return this->sync->innerRunAndReset();
463  }
464 
465  public:
466 
467  FutureTask(const FutureTask<T>& source) : RunnableFuture<T>(), sync(source.sync) {
468  }
469 
471  this->sync = source.sync;
472  return *this;
473  }
474 
475  };
476 
477 }}}
478 
479 #endif /* _DECAF_UTIL_CONCURRENT_FUTURETASK_H_ */
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition: Pointer.h:161
virtual void setException(const decaf::lang::Exception &error)
Causes this future to report an ExecutionException with the given Exception as its cause...
Definition: FutureTask.h:445
FutureTask(decaf::lang::Runnable *runnable, const T &result, bool takeOwnership=true)
Creates a FutureTask that will, upon running, execute the given Runnable, and arrange that the get me...
Definition: FutureTask.h:375
static Thread * currentThread()
Returns a pointer to the currently executing thread object.
virtual bool runAndReset()
Executes the computation without setting its result, and then resets this Future to initial state...
Definition: FutureTask.h:461
Definition: AbstractQueuedSynchronizer.h:35
FutureTask(const FutureTask< T > &source)
Definition: FutureTask.h:467
virtual void set(const T &result)
Sets the result of this Future to the given value unless this future has already been set or has been...
Definition: FutureTask.h:432
A task that returns a result and may throw an exception.
Definition: Callable.h:47
#define NULL
Definition: Config.h:33
virtual bool cancel(bool mayInterruptIfRunning)
Attempts to cancel execution of this task.
Definition: FutureTask.h:395
#define DECAF_CATCHALL_NOTHROW()
A catch-all that does not throw an exception, one use would be to catch any exception in a destructor...
Definition: ExceptionDefines.h:62
Definition: CancellationException.h:30
virtual void done()
Protected method invoked when this task transitions to state isDone (whether normally or via cancella...
Definition: FutureTask.h:421
A cancellable asynchronous computation.
Definition: FutureTask.h:58
virtual Exception * clone() const
Clones this exception.
Definition: TimeoutException.h:32
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition: TimeUnit.h:62
virtual ~FutureTask()
Definition: FutureTask.h:384
FutureTask< T > & operator=(const FutureTask< T > &source)
Definition: FutureTask.h:470
FutureTask(Callable< T > *callable, bool takeOwnership=true)
Creates a FutureTask instance that will, upon running, execute the given Callable.
Definition: FutureTask.h:352
virtual void run()
Run method - called by the Thread class in the context of the thread.
Definition: FutureTask.h:449
Definition: ExecutionException.h:31
virtual V call()=0
Computes a result, or throws an exception if unable to do so.
A Runnable version of the Future type.
Definition: RunnableFuture.h:38
A Thread is a concurrent unit of execution.
Definition: Thread.h:64
Definition: NullPointerException.h:32
virtual bool isDone() const
Returns true if this task completed.
Definition: FutureTask.h:391
#define DECAF_UNUSED
Definition: Config.h:160
Interface for a runnable object - defines a task that can be run by a thread.
Definition: Runnable.h:29
Definition: Exception.h:38
FutureTask< T > * clone()
Definition: FutureTask.h:407
void interrupt()
Interrupts the Thread if it is blocked and in an interruptible state.
Decaf&#39;s implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: AprPool.h:25
virtual bool isCancelled() const
Returns true if this task was canceled before it completed normally.
Definition: FutureTask.h:387