activemq-cpp-3.8.2
ActiveMQSessionKernel.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
19 #define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
20 
21 #include <cms/Session.h>
22 #include <cms/ExceptionListener.h>
23 
24 #include <activemq/util/Config.h>
25 #include <activemq/util/Usage.h>
42 
43 #include <decaf/lang/Pointer.h>
44 #include <decaf/util/Properties.h>
46 
47 #include <string>
48 #include <memory>
49 
50 namespace activemq {
51 namespace core {
52 
53  class ActiveMQConnection;
54  class ActiveMQConsumer;
55  class ActiveMQProducer;
56  class ActiveMQSessionExecutor;
57 
58 namespace kernels {
59 
62 
63  class SessionConfig;
64 
65  class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher {
66  private:
67 
69 
70  protected:
71 
72  SessionConfig* config;
73 
78 
83 
88 
94 
98  std::auto_ptr<ActiveMQSessionExecutor> executor;
99 
104 
109 
114 
119 
124 
125  private:
126 
128  ActiveMQSessionKernel& operator=(const ActiveMQSessionKernel&);
129 
130  public:
131 
135  const decaf::util::Properties& properties);
136 
137  virtual ~ActiveMQSessionKernel();
138 
143  virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
144 
148  virtual void start();
149 
153  virtual void stop();
154 
159  bool isStarted() const;
160 
161  virtual bool isAutoAcknowledge() const {
162  return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
163  }
164 
165  virtual bool isDupsOkAcknowledge() const {
166  return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
167  }
168 
169  virtual bool isClientAcknowledge() const {
170  return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
171  }
172 
173  virtual bool isIndividualAcknowledge() const {
174  return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
175  }
176 
180  void fire(const exceptions::ActiveMQException& ex);
181 
182  public: // Methods from ActiveMQMessageDispatcher
183 
188  virtual void dispatch(const Pointer<MessageDispatch>& message);
189 
190  public: // Implements Methods
191 
192  virtual void close();
193 
194  virtual void commit();
195 
196  virtual void rollback();
197 
198  virtual void recover();
199 
200  virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination);
201 
202  virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
203  const std::string& selector);
204 
205  virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
206  const std::string& selector,
207  bool noLocal);
208 
209  virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination,
210  const std::string& name,
211  const std::string& selector,
212  bool noLocal = false);
213 
214  virtual cms::MessageProducer* createProducer(const cms::Destination* destination);
215 
216  virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue);
217 
218  virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
219 
220  virtual cms::Queue* createQueue(const std::string& queueName);
221 
222  virtual cms::Topic* createTopic(const std::string& topicName);
223 
224  virtual cms::TemporaryQueue* createTemporaryQueue();
225 
226  virtual cms::TemporaryTopic* createTemporaryTopic();
227 
228  virtual cms::Message* createMessage();
229 
230  virtual cms::BytesMessage* createBytesMessage();
231 
232  virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
233 
234  virtual cms::StreamMessage* createStreamMessage();
235 
236  virtual cms::TextMessage* createTextMessage();
237 
238  virtual cms::TextMessage* createTextMessage( const std::string& text );
239 
240  virtual cms::MapMessage* createMapMessage();
241 
242  virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
243 
244  virtual bool isTransacted() const;
245 
246  virtual void unsubscribe(const std::string& name);
247 
248  public: // ActiveMQSessionKernel specific Methods
249 
278  cms::Message* message, int deliveryMode, int priority, long long timeToLive,
279  util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
280 
289  cms::ExceptionListener* getExceptionListener();
290 
298  virtual void setMessageTransformer(cms::MessageTransformer* transformer);
299 
305  virtual cms::MessageTransformer* getMessageTransformer() const;
306 
313  this->checkClosed();
314  return *( this->sessionInfo );
315  }
316 
323  this->checkClosed();
324  return *( this->sessionInfo->getSessionId() );
325  }
326 
331  return this->connection;
332  }
333 
337  Pointer<threads::Scheduler> getScheduler() const;
338 
344  long long getLastDeliveredSequenceId() const {
345  return this->lastDeliveredSequenceId;
346  }
347 
354  void setLastDeliveredSequenceId(long long value) {
355  this->lastDeliveredSequenceId = value;
356  }
357 
367  void oneway(Pointer<commands::Command> command);
368 
383  Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
384 
395  void addConsumer(Pointer<ActiveMQConsumerKernel> consumer);
396 
406  void removeConsumer(Pointer<ActiveMQConsumerKernel> consumer);
407 
418  void addProducer(Pointer<ActiveMQProducerKernel> producer);
419 
429  void removeProducer(Pointer<ActiveMQProducerKernel> producer);
430 
438  virtual void doStartTransaction();
439 
446  return this->transaction;
447  }
448 
453  void acknowledge();
454 
459  void deliverAcks();
460 
466 
470  void wakeup();
471 
476  Pointer<commands::ConsumerId> getNextConsumerId();
477 
482  Pointer<commands::ProducerId> getNextProducerId();
483 
490  void doClose();
491 
498  void dispose();
499 
509  void setPrefetchSize(Pointer<commands::ConsumerId> id, int prefetch);
510 
518 
524  bool isInUse(Pointer<commands::ActiveMQDestination> destination);
525 
530 
535 
544  bool iterateConsumers();
545 
551  void checkMessageListener() const;
552 
558  virtual int getHashCode() const;
559 
569  void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
570 
571  private:
572 
577  long long getNextProducerSequenceId() {
578  return this->producerSequenceIds.getNextSequenceId();
579  }
580 
581  // Checks for the closed state and throws if so.
582  void checkClosed() const;
583 
584  // Send the Destination Creation Request to the Broker, alerting it
585  // that we've created a new Temporary Destination.
586  // @param tempDestination - The new Temporary Destination
587  void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
588 
589  // Send the Destination Destruction Request to the Broker, alerting
590  // it that we've removed an existing Temporary Destination.
591  // @param tempDestination - The Temporary Destination to remove
592  void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
593 
594  // Creates a new Temporary Destination name using the connection id
595  // and a rolling count.
596  // @returns a unique Temporary Destination name
597  std::string createTemporaryDestinationName();
598 
599  };
600 
601 }}}
602 
603 #endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_ */
virtual void close()
Terminates the dispatching thread.
Definition: ActiveMQSessionExecutor.h:120
An interface encapsulating a provider-specific topic name.
Definition: Topic.h:36
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition: MessageTransformer.h:37
With this acknowledgment mode, the session automatically acknowledges a client&#39;s receipt of a message...
Definition: Session.h:128
Definition: ActiveMQTempDestination.h:34
virtual void stop()
Stops dispatching.
virtual bool isIndividualAcknowledge() const
Definition: ActiveMQSessionKernel.h:173
Root of all messages.
Definition: Message.h:88
Defines a Temporary Topic based Destination.
Definition: TemporaryTopic.h:39
A Destination object encapsulates a provider-specific address.
Definition: Destination.h:39
#define AMQCPP_API
Definition: Config.h:30
virtual bool isClientAcknowledge() const
Definition: ActiveMQSessionKernel.h:169
AcknowledgeMode
Definition: Session.h:108
Asynchronous event interface for CMS asynchronous operations.
Definition: AsyncCallback.h:37
std::auto_ptr< ActiveMQSessionExecutor > executor
Sends incoming messages to the registered consumers.
Definition: ActiveMQSessionKernel.h:98
With this acknowledgment mode, the session automatically acknowledges a client&#39;s receipt of a message...
Definition: Session.h:117
Definition: SessionInfo.h:48
This class implements in interface for browsing the messages in a Queue without removing them...
Definition: QueueBrowser.h:53
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition: ExceptionListener.h:37
virtual void start()
Starts the dispatching.
cms::Session::AcknowledgeMode ackMode
This Sessions Acknowledgment mode.
Definition: ActiveMQSessionKernel.h:103
virtual bool isDupsOkAcknowledge() const
Definition: ActiveMQSessionKernel.h:165
A BytesMessage object is used to send a message containing a stream of unsigned bytes.
Definition: BytesMessage.h:66
ActiveMQConnection * getConnection() const
Gets the ActiveMQConnection that is associated with this session.
Definition: ActiveMQSessionKernel.h:330
Pointer< ActiveMQTransactionContext > getTransactionContext()
Gets the Pointer to this Session&#39;s TransactionContext.
Definition: ActiveMQSessionKernel.h:445
virtual void wakeup()
wakeup this executer and dispatch any pending messages.
util::LongSequenceGenerator consumerIds
Next available Consumer Id.
Definition: ActiveMQSessionKernel.h:118
util::LongSequenceGenerator producerSequenceIds
Next available Producer Sequence Id.
Definition: ActiveMQSessionKernel.h:113
util::LongSequenceGenerator producerIds
Next available Producer Id.
Definition: ActiveMQSessionKernel.h:108
Defines a Temporary Queue based Destination.
Definition: TemporaryQueue.h:39
A Session object is a single-threaded context for producing and consuming messages.
Definition: Session.h:105
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:344
Delegate dispatcher for a single session.
Definition: ActiveMQSessionExecutor.h:44
const commands::SessionInfo & getSessionInfo() const
Gets the Session Information object for this session, if the session is closed than this method throw...
Definition: ActiveMQSessionKernel.h:312
Definition: ActiveMQProducerKernel.h:44
Message will be acknowledged individually.
Definition: Session.h:146
With this acknowledgment mode, the client acknowledges a consumed message by calling the message&#39;s ac...
Definition: Session.h:134
A MapMessage object is used to send a set of name-value pairs.
Definition: MapMessage.h:71
Definition: MemoryUsage.h:28
Interface for a StreamMessage.
Definition: StreamMessage.h:61
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
virtual bool isAutoAcknowledge() const
Definition: ActiveMQSessionKernel.h:161
virtual void clearMessagesInProgress()
Removes all messages in the Dispatch Channel so that non are delivered.
Definition: ActiveMQSessionExecutor.h:90
Definition: ActiveMQException.h:35
Pointer< commands::SessionInfo > sessionInfo
SessionInfo for this Session.
Definition: ActiveMQSessionKernel.h:77
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:354
AtomicBoolean closed
Indicates that this connection has been closed, it is no longer usable after this becomes true...
Definition: ActiveMQSessionKernel.h:93
A client uses a MessageProducer object to send messages to a Destination.
Definition: MessageProducer.h:60
This class is used to generate a sequence of long long values that are incremented each time a new va...
Definition: LongSequenceGenerator.h:32
A client uses a MessageConsumer to received messages from a destination.
Definition: MessageConsumer.h:63
long long lastDeliveredSequenceId
Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:123
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: CachedConsumer.h:24
Definition: SessionId.h:51
Java-like properties class for mapping string names to string values.
Definition: Properties.h:53
Definition: ActiveMQSessionKernel.h:65
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Definition: MessageDispatchChannel.h:34
Decaf&#39;s implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Concrete connection used for all connectors to the ActiveMQ broker.
Definition: ActiveMQConnection.h:60
SessionConfig * config
Definition: ActiveMQSessionKernel.h:72
Interface for a text message.
Definition: TextMessage.h:41
ActiveMQConnection * connection
Connection.
Definition: ActiveMQSessionKernel.h:87
const commands::SessionId & getSessionId() const
Gets the Session Id object for this session, if the session is closed than this method throws an exce...
Definition: ActiveMQSessionKernel.h:322
An interface encapsulating a provider-specific queue name.
Definition: Queue.h:37
Pointer< ActiveMQTransactionContext > transaction
Transaction Management object.
Definition: ActiveMQSessionKernel.h:82