activemq-cpp-3.8.2
ActiveMQConsumerKernel.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
18 #define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
19 
20 #include <cms/MessageConsumer.h>
21 #include <cms/MessageListener.h>
23 #include <cms/Message.h>
24 #include <cms/CMSException.h>
25 
26 #include <activemq/util/Config.h>
34 
36 #include <decaf/lang/Pointer.h>
38 
39 namespace activemq {
40 namespace core {
41 namespace kernels {
42 
45 
46  class ActiveMQSessionKernel;
47  class ActiveMQConsumerKernelConfig;
48 
50  private:
51 
55  ActiveMQConsumerKernelConfig* internal;
56 
60  ActiveMQSessionKernel* session;
61 
66 
67  private:
68 
71 
72  public:
73 
76  const Pointer<commands::ActiveMQDestination>& destination,
77  const std::string& name,
78  const std::string& selector,
79  int prefetch,
80  int maxPendingMessageCount,
81  bool noLocal,
82  bool browser,
83  bool dispatchAsync,
84  cms::MessageListener* listener);
85 
86  virtual ~ActiveMQConsumerKernel();
87 
88  public: // Interface Implementation
89 
90  virtual void start();
91 
92  virtual void stop();
93 
94  virtual void close();
95 
96  virtual cms::Message* receive();
97 
98  virtual cms::Message* receive(int millisecs);
99 
100  virtual cms::Message* receiveNoWait();
101 
102  virtual void setMessageListener(cms::MessageListener* listener);
103 
104  virtual cms::MessageListener* getMessageListener() const;
105 
106  virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener);
107 
108  virtual cms::MessageAvailableListener* getMessageAvailableListener() const;
109 
110  virtual std::string getMessageSelector() const;
111 
112  virtual void setMessageTransformer(cms::MessageTransformer* transformer);
113 
114  virtual cms::MessageTransformer* getMessageTransformer() const;
115 
116  public: // Dispatcher Methods
117 
118  virtual void dispatch( const Pointer<MessageDispatch>& message );
119 
120  virtual int getHashCode() const;
121 
122  public: // ActiveMQConsumerKernel Methods
123 
129  void acknowledge();
130 
136  void acknowledge(Pointer<commands::MessageDispatch> dispatch);
137 
143  void acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType);
144 
150  void commit();
151 
157  void rollback();
158 
164  void doClose();
165 
171  void dispose();
172 
177  const Pointer<commands::ConsumerInfo>& getConsumerInfo() const;
178 
183  const Pointer<commands::ConsumerId>& getConsumerId() const;
184 
188  bool isClosed() const;
189 
194  bool isSynchronizationRegistered() const ;
195 
200  void setSynchronizationRegistered(bool value);
201 
207  bool iterate();
208 
214  void deliverAcks();
215 
219  void clearMessagesInProgress();
220 
225  void inProgressClearRequired();
226 
232  long long getLastDeliveredSequenceId() const;
233 
239  bool isTransactedIndividualAck() const;
240 
247  void setTransactedIndividualAck(bool value);
248 
254  long long setFailoverRedeliveryWaitPeriod() const;
255 
263  void setFailoverRedeliveryWaitPeriod(long long value);
264 
271  void setLastDeliveredSequenceId(long long value);
272 
276  int getMessageAvailableCount() const;
277 
287  void setRedeliveryPolicy(RedeliveryPolicy* policy);
288 
295  RedeliveryPolicy* getRedeliveryPolicy() const;
296 
303  void setFailureError(decaf::lang::Exception* error);
304 
311  decaf::lang::Exception* getFailureError() const;
312 
317  void setPrefetchSize(int prefetchSize);
318 
324  bool isInUse(Pointer<commands::ActiveMQDestination> destination) const;
325 
332  long long getOptimizedAckScheduledAckInterval() const;
333 
342  void setOptimizedAckScheduledAckInterval(long long value);
343 
347  bool isOptimizeAcknowledge() const;
348 
355  void setOptimizeAcknowledge(bool value);
356 
357  protected:
358 
374  Pointer<MessageDispatch> dequeue(long long timeout);
375 
380  void beforeMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch);
381 
387  void afterMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch, bool messageExpired);
388 
389  private:
390 
392 
393  void applyDestinationOptions(Pointer<commands::ConsumerInfo> info);
394 
395  void sendPullRequest(long long timeout);
396 
397  void checkClosed() const;
398 
399  void checkMessageListener() const;
400 
401  void ackLater(Pointer<commands::MessageDispatch> message, int ackType);
402 
403  void immediateIndividualTransactedAck(Pointer<commands::MessageDispatch> dispatch);
404 
405  Pointer<commands::MessageAck> makeAckForAllDeliveredMessages(int type);
406 
407  bool isAutoAcknowledgeEach() const;
408 
409  bool isAutoAcknowledgeBatch() const;
410 
411  void registerSync();
412 
413  void clearDispatchList();
414 
415  };
416 
417 }}}
418 
419 #endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_ */
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition: MessageTransformer.h:37
Interface for a RedeliveryPolicy object that controls how message Redelivery is handled in ActiveMQ-C...
Definition: RedeliveryPolicy.h:34
Root of all messages.
Definition: Message.h:88
#define AMQCPP_API
Definition: Config.h:30
Definition: ActiveMQConsumerKernel.h:49
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
A client uses a MessageConsumer to received messages from a destination.
Definition: MessageConsumer.h:63
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: CachedConsumer.h:24
Definition: Exception.h:38
A listener interface similar to the MessageListener interface.
Definition: MessageAvailableListener.h:33
Definition: ActiveMQSessionKernel.h:65
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Decaf&#39;s implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
A MessageListener object is used to receive asynchronously delivered messages.
Definition: MessageListener.h:33