activemq-cpp-3.8.2
ActiveMQConnection.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
19 #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
20 
21 #include <cms/EnhancedConnection.h>
22 #include <activemq/util/Config.h>
34 #include <decaf/util/Properties.h>
40 
41 #include <string>
42 #include <memory>
43 
44 namespace activemq{
45 namespace core{
46 
48 
49  class ActiveMQSession;
50  class ConnectionConfig;
51  class PrefetchPolicy;
52  class RedeliveryPolicy;
53 
62  private:
63 
64  ConnectionConfig* config;
65 
69  std::auto_ptr<cms::ConnectionMetaData> connectionMetaData;
70 
75 
81 
87 
92 
93  private:
94 
96  ActiveMQConnection& operator=(const ActiveMQConnection&);
97 
98  public:
99 
109  const Pointer<decaf::util::Properties> properties);
110 
111  virtual ~ActiveMQConnection();
112 
121  virtual void addSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session);
122 
131  virtual void removeSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session);
132 
141  virtual void addProducer(Pointer<kernels::ActiveMQProducerKernel> producer);
142 
148  virtual void removeProducer(const Pointer<commands::ProducerId>& producerId);
149 
156  virtual void addDispatcher(const Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher);
157 
163  virtual void removeDispatcher(const Pointer<commands::ConsumerId>& consumer);
164 
175  virtual void sendPullRequest(const commands::ConsumerInfo* consumer, long long timeout);
176 
181  bool isClosed() const {
182  return this->closed.get();
183  }
184 
189  bool isStarted() const {
190  return this->started.get();
191  }
192 
197  bool isTransportFailed() const {
198  return this->transportFailed.get();
199  }
200 
219  virtual void destroyDestination(const commands::ActiveMQDestination* destination);
220 
239  virtual void destroyDestination(const cms::Destination* destination);
240 
251  bool isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
252 
261  void rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
262 
263  public: // Connection Interface Methods
264 
268  virtual const cms::ConnectionMetaData* getMetaData() const {
269  return connectionMetaData.get();
270  }
271 
275  virtual cms::Session* createSession();
276 
280  virtual std::string getClientID() const;
281 
285  virtual void setClientID(const std::string& clientID);
286 
290  virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode);
291 
295  virtual void close();
296 
300  virtual void start();
301 
305  virtual void stop();
306 
310  virtual cms::ExceptionListener* getExceptionListener() const;
311 
315  virtual void setExceptionListener(cms::ExceptionListener* listener);
316 
320  virtual void setMessageTransformer(cms::MessageTransformer* transformer);
321 
325  virtual cms::MessageTransformer* getMessageTransformer() const;
326 
330  virtual cms::DestinationSource* getDestinationSource();
331 
332  public: // Configuration Options
333 
338  void setUsername(const std::string& username);
339 
345  const std::string& getUsername() const;
346 
351  void setPassword(const std::string& password);
352 
358  const std::string& getPassword() const;
359 
364  void setDefaultClientId(const std::string& clientId);
365 
371  void setBrokerURL(const std::string& brokerURL);
372 
378  const std::string& getBrokerURL() const;
379 
388  void setPrefetchPolicy(PrefetchPolicy* policy);
389 
395  PrefetchPolicy* getPrefetchPolicy() const;
396 
405  void setRedeliveryPolicy(RedeliveryPolicy* policy);
406 
412  RedeliveryPolicy* getRedeliveryPolicy() const;
413 
417  bool isDispatchAsync() const;
418 
427  void setDispatchAsync(bool value);
428 
434  bool isAlwaysSyncSend() const;
435 
441  void setAlwaysSyncSend(bool value);
442 
447  bool isUseAsyncSend() const;
448 
453  void setUseAsyncSend(bool value);
454 
459  bool isUseCompression() const;
460 
467  void setUseCompression(bool value);
468 
478  void setCompressionLevel(int value);
479 
485  int getCompressionLevel() const;
486 
491  unsigned int getSendTimeout() const;
492 
498  void setSendTimeout(unsigned int timeout);
499 
504  unsigned int getCloseTimeout() const;
505 
510  void setCloseTimeout(unsigned int timeout);
511 
519  unsigned int getProducerWindowSize() const;
520 
527  void setProducerWindowSize(unsigned int windowSize);
528 
533  bool isMessagePrioritySupported() const;
534 
542  void setMessagePrioritySupported(bool value);
543 
548  long long getNextTempDestinationId();
549 
554  long long getNextLocalTransactionId();
555 
562  bool isWatchTopicAdvisories() const;
563 
571  void setWatchTopicAdvisories(bool value);
572 
581  int getAuditDepth() const;
582 
592  void setAuditDepth(int auditDepth);
593 
599  int getAuditMaximumProducerNumber() const;
600 
607  void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
608 
621  bool isCheckForDuplicates() const;
622 
636  void setCheckForDuplicates(bool checkForDuplicates);
637 
645  bool isTransactedIndividualAck() const;
646 
655  void setTransactedIndividualAck(bool transactedIndividualAck);
656 
663  bool isNonBlockingRedelivery() const;
664 
673  void setNonBlockingRedelivery(bool nonBlockingRedelivery);
674 
680  long long getConsumerFailoverRedeliveryWaitPeriod() const;
681 
688  void setConsumerFailoverRedeliveryWaitPeriod(long long value);
689 
693  bool isOptimizeAcknowledge() const;
694 
701  void setOptimizeAcknowledge(bool optimizeAcknowledge);
702 
708  long long getOptimizeAcknowledgeTimeOut() const;
709 
716  void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut);
717 
726  long long getOptimizedAckScheduledAckInterval() const;
727 
737  void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval);
738 
744  bool isUseRetroactiveConsumer() const;
745 
754  void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
755 
761  bool isExclusiveConsumer() const;
762 
770  void setExclusiveConsumer(bool exclusiveConsumer);
771 
778  bool isSendAcksAsync() const;
779 
787  void setSendAcksAsync(bool sendAcksAsync);
788 
789  public: // TransportListener
790 
802  void addTransportListener(transport::TransportListener* transportListener);
803 
812  void removeTransportListener(transport::TransportListener* transportListener);
813 
819  virtual void onCommand(const Pointer<commands::Command> command);
820 
825  virtual void onException(const decaf::lang::Exception& ex);
826 
830  virtual void transportInterrupted();
831 
835  virtual void transportResumed();
836 
837  public:
838 
845  const commands::ConnectionInfo& getConnectionInfo() const;
846 
853  const commands::ConnectionId& getConnectionId() const;
854 
860  transport::Transport& getTransport() const;
861 
867  Pointer<threads::Scheduler> getScheduler() const;
868 
875  std::string getResourceManagerId() const;
876 
881  void cleanup();
882 
893  void oneway(Pointer<commands::Command> command);
894 
909  Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
910 
923  void asyncRequest(Pointer<commands::Command> command, cms::AsyncCallback* onComplete);
924 
929  virtual void fire(const exceptions::ActiveMQException& ex);
930 
935  void setTransportInterruptionProcessingComplete();
936 
943  void setFirstFailureError(decaf::lang::Exception* error);
944 
950  decaf::lang::Exception* getFirstFailureError() const;
951 
958  void onAsyncException(const decaf::lang::Exception& ex);
959 
966  void onClientInternalException(const decaf::lang::Exception& ex);
967 
973  void checkClosed() const;
974 
980  void checkClosedOrFailed() const;
981 
985  void ensureConnectionInfoSent();
986 
990  decaf::util::concurrent::ExecutorService* getExecutor() const;
991 
999  void addTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
1000 
1008  void removeTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
1009 
1019  void deleteTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
1020 
1027  void cleanUpTempDestinations();
1028 
1035  bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const;
1036 
1037  protected:
1038 
1042  virtual Pointer<commands::SessionId> getNextSessionId();
1043 
1044  // Sends a oneway disconnect message to the broker.
1045  void disconnect(long long lastDeliveredSequenceId);
1046 
1047  // Waits for all Consumers to handle the Transport Interrupted event.
1048  void waitForTransportInterruptionProcessingToComplete();
1049 
1050  // Marks processing complete for a single caller when interruption processing completes.
1051  void signalInterruptionProcessingComplete();
1052 
1053  // Allow subclasses to access the original Properties object for this connection.
1054  const decaf::util::Properties& getProperties() const;
1055 
1056  // Process the ControlCommand command
1057  void onControlCommand(Pointer<commands::Command> command);
1058 
1059  // Process the ConnectionControl command
1060  void onConnectionControl(Pointer<commands::Command> command);
1061 
1062  // Process the ConsumerControl command
1063  void onConsumerControl(Pointer<commands::Command> command);
1064 
1065  };
1066 
1067 }}
1068 
1069 #endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_*/
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition: MessageTransformer.h:37
bool isTransportFailed() const
Checks if the Connection&#39;s Transport has failed.
Definition: ActiveMQConnection.h:197
Interface for a RedeliveryPolicy object that controls how message Redelivery is handled in ActiveMQ-C...
Definition: RedeliveryPolicy.h:34
Definition: ConnectionInfo.h:49
A Destination object encapsulates a provider-specific address.
Definition: Destination.h:39
#define AMQCPP_API
Definition: Config.h:30
AcknowledgeMode
Definition: Session.h:108
Asynchronous event interface for CMS asynchronous operations.
Definition: AsyncCallback.h:37
Definition: ConnectionId.h:51
An Executor that provides methods to manage termination and methods that can produce a Future for tra...
Definition: ExecutorService.h:56
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition: ExceptionListener.h:37
bool isClosed() const
Checks if this connection has been closed.
Definition: ActiveMQConnection.h:181
A ConnectionMetaData object provides information describing the Connection object.
Definition: ConnectionMetaData.h:31
Interface for a transport layer for command objects.
Definition: Transport.h:59
A Session object is a single-threaded context for producing and consuming messages.
Definition: Session.h:105
Definition: ConsumerInfo.h:51
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
Interface for a Policy object that controls message Prefetching on various destination types in Activ...
Definition: PrefetchPolicy.h:34
Definition: ActiveMQException.h:35
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: CachedConsumer.h:24
Definition: Exception.h:38
Java-like properties class for mapping string names to string values.
Definition: Properties.h:53
virtual const cms::ConnectionMetaData * getMetaData() const
Gets the metadata for this connection.the connection MetaData pointer ( caller does not own it )...
Definition: ActiveMQConnection.h:268
An enhanced CMS Connection instance that provides additional features above the default required feat...
Definition: EnhancedConnection.h:33
bool get() const
Gets the current value of this AtomicBoolean.
Definition: AtomicBoolean.h:63
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Decaf&#39;s implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Provides an object that will provide a snapshot view of Destinations that exist on the Message provid...
Definition: DestinationSource.h:38
bool isStarted() const
Check if this connection has been started.
Definition: ActiveMQConnection.h:189
Concrete connection used for all connectors to the ActiveMQ broker.
Definition: ActiveMQConnection.h:60
A listener of asynchronous exceptions from a command transport object.
Definition: TransportListener.h:38
Definition: ActiveMQDestination.h:38