|
libcaf
0.15.5
|
A broker mediates between actor systems and other components in the network. More...
#include <abstract_broker.hpp>
Public Member Functions | |
| void | enqueue (mailbox_element_ptr, execution_unit *) override |
| void | enqueue (strong_actor_ptr, message_id, message, execution_unit *) override |
| void | launch (execution_unit *eu, bool lazy, bool hide) override |
| bool | cleanup (error &&reason, execution_unit *host) override |
| resume_result | resume (execution_unit *, size_t) override |
| Resume any pending computation until it is either finished or needs to be re-scheduled later. More... | |
| template<class Handle > | |
| void | halt (Handle hdl) |
Suspends activities on hdl unconditionally. | |
| template<class Handle > | |
| void | trigger (Handle hdl) |
Allows activities on hdl unconditionally (default). | |
| template<class Handle > | |
| void | trigger (Handle hdl, size_t num_events) |
Allows num_events activities on hdl. | |
| void | configure_read (connection_handle hdl, receive_policy::config cfg) |
| Modifies the receive policy for a given connection. More... | |
| void | ack_writes (connection_handle hdl, bool enable) |
| Enables or disables write notifications for a given connection. | |
| std::vector< char > & | wr_buf (connection_handle hdl) |
| Returns the write buffer for a given connection. | |
| void | write (connection_handle hdl, size_t bs, const void *buf) |
Writes data into the buffer for a given connection. | |
| void | flush (connection_handle hdl) |
| Sends the content of the buffer for a given connection. | |
| void | ack_writes (datagram_handle hdl, bool enable) |
| Enables or disables write notifications for a given datagram socket. | |
| std::vector< char > & | wr_buf (datagram_handle hdl) |
| Returns the write buffer for a given sink. | |
| void | enqueue_datagram (datagram_handle, std::vector< char >) |
| Enqueue a buffer to be sent as a datagram via a given endpoint. | |
| void | write (datagram_handle hdl, size_t data_size, const void *data) |
Writes data into the buffer of a given sink. | |
| void | flush (datagram_handle hdl) |
| Sends the content of the buffer to a UDP endpoint. | |
| middleman & | parent () |
| Returns the middleman instance this broker belongs to. | |
| void | add_scribe (scribe_ptr ptr) |
Adds the unitialized scribe instance ptr to this broker. | |
| connection_handle | add_scribe (network::native_socket fd) |
Creates and assigns a new scribe from given native socked fd. | |
| expected< connection_handle > | add_tcp_scribe (const std::string &host, uint16_t port) |
Tries to connect to host on given port and creates a new scribe describing the connection afterwards. More... | |
| void | move_scribe (scribe_ptr ptr) |
Moves the initialized scribe instance ptr from another broker to this broker. More... | |
| void | add_doorman (doorman_ptr ptr) |
Adds a doorman instance to this broker. | |
| accept_handle | add_doorman (network::native_socket fd) |
Creates and assigns a new doorman from given native socked fd. | |
| void | move_doorman (doorman_ptr ptr) |
Adds a doorman instance to this broker. | |
| expected< std::pair< accept_handle, uint16_t > > | add_tcp_doorman (uint16_t port=0, const char *in=nullptr, bool reuse_addr=false) |
Tries to open a local port and creates a doorman managing it on success. More... | |
| void | add_datagram_servant (datagram_servant_ptr ptr) |
Adds a datagram_servant to this broker. | |
| void | add_hdl_for_datagram_servant (datagram_servant_ptr ptr, datagram_handle hdl) |
Adds the datagram_servant under an additional hdl. | |
| datagram_handle | add_datagram_servant (network::native_socket fd) |
Creates and assigns a new datagram_servant from a given socket fd. | |
| datagram_handle | add_datagram_servant_for_endpoint (network::native_socket fd, const network::ip_endpoint &ep) |
Creates and assigns a new datagram_servant from a given socket fd for the remote endpoint ep. More... | |
| expected< datagram_handle > | add_udp_datagram_servant (const std::string &host, uint16_t port) |
Creates a new datagram_servant for the remote endpoint host and port. More... | |
| expected< std::pair< datagram_handle, uint16_t > > | add_udp_datagram_servant (uint16_t port=0, const char *in=nullptr, bool reuse_addr=false) |
Tries to open a local port and creates a datagram_servant managing it on success. More... | |
| void | move_datagram_servant (datagram_servant_ptr ptr) |
Moves an initialized datagram_servant instance ptr from another broker to this one. More... | |
| std::string | remote_addr (connection_handle hdl) |
Returns the remote address associated with hdl or empty string if hdl is invalid. More... | |
| uint16_t | remote_port (connection_handle hdl) |
Returns the remote port associated with hdl or 0 if hdl is invalid. More... | |
| std::string | local_addr (accept_handle hdl) |
Returns the local address associated with hdl or empty string if hdl is invalid. More... | |
| uint16_t | local_port (accept_handle hdl) |
Returns the local port associated with hdl or 0 if hdl is invalid. | |
| accept_handle | hdl_by_port (uint16_t port) |
Returns the handle associated with given local port or none. | |
| datagram_handle | datagram_hdl_by_port (uint16_t port) |
Returns the dgram handle associated with given local port or none. | |
| std::string | remote_addr (datagram_handle hdl) |
Returns the remote address associated with hdl or an empty string if hdl is invalid. More... | |
| uint16_t | remote_port (datagram_handle hdl) |
Returns the remote port associated with hdl or 0 if hdl is invalid. More... | |
| uint16_t | local_port (datagram_handle hdl) |
Returns the remote port associated with hdl or 0 if hdl is invalid. More... | |
| bool | remove_endpoint (datagram_handle hdl) |
Remove the endpoint hdl from the broker. | |
| void | close_all () |
| Closes all connections and acceptors. | |
| template<class Handle > | |
| bool | close (Handle hdl) |
Closes the connection or acceptor identified by handle. More... | |
| template<class Handle > | |
| bool | valid (Handle hdl) |
Checks whether hdl is assigned to broker. | |
| const char * | name () const override |
| subtype_t | subtype () const override |
| Returns a subtype hint for this object. More... | |
| size_t | num_connections () const |
| Returns the number of open connections. | |
| std::vector< connection_handle > | connections () const |
Returns all handles of all scribe instances attached to this broker. | |
| network::multiplexer & | backend () |
Returns the multiplexer running this broker. | |
Public Member Functions inherited from caf::scheduled_actor | |
| scheduled_actor (actor_config &cfg) | |
| void | enqueue (mailbox_element_ptr ptr, execution_unit *eu) override |
| const char * | name () const override |
| void | launch (execution_unit *eu, bool lazy, bool hide) override |
| bool | cleanup (error &&fail_state, execution_unit *host) override |
| void | intrusive_ptr_add_ref_impl () override |
| Add a strong reference count to this object. | |
| void | intrusive_ptr_release_impl () override |
| Remove a strong reference count from this object. | |
| virtual proxy_registry * | proxy_registry_ptr () |
Returns a factory for proxies created and managed by this actor or nullptr. More... | |
| void | quit (error x=error{}) |
| Finishes execution of this actor after any currently running message handler is done. More... | |
| void | set_default_handler (default_handler fun) |
| Sets a custom handler for unexpected messages. | |
| template<class F > | |
| std::enable_if< std::is_convertible< F, std::function< result< message >type_erased_tuple &)> >::value >::type | set_default_handler (F fun) |
| Sets a custom handler for unexpected messages. | |
| void | set_error_handler (error_handler fun) |
| Sets a custom handler for error messages. | |
| template<class T > | |
| auto | set_error_handler (T fun) -> decltype(fun(std::declval< error &>())) |
| Sets a custom handler for error messages. | |
| void | set_down_handler (down_handler fun) |
| Sets a custom handler for down messages. | |
| template<class T > | |
| auto | set_down_handler (T fun) -> decltype(fun(std::declval< down_msg &>())) |
| Sets a custom handler for down messages. | |
| void | set_exit_handler (exit_handler fun) |
| Sets a custom handler for error messages. | |
| template<class T > | |
| auto | set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg &>())) |
| Sets a custom handler for exit messages. | |
| void | set_exception_handler (exception_handler fun) |
| Sets a custom exception handler for this actor. More... | |
| template<class F > | |
| std::enable_if< std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type | set_exception_handler (F f) |
| Sets a custom exception handler for this actor. More... | |
| stream_id | make_stream_id () |
| Returns a new stream ID. | |
| template<class Handle , class... Ts, class Init , class Getter , class ClosedPredicate , class ResHandler , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>> | |
| annotated_stream< typename stream_source_trait_t< Getter >::output, Ts... > | make_source (const Handle &dest, std::tuple< Ts... > xs, Init init, Getter getter, ClosedPredicate pred, ResHandler res_handler, policy::arg< Scatterer > scatterer_type={}) |
Creates a new stream source and starts streaming to dest. More... | |
| template<class Handle , class Init , class Getter , class ClosedPredicate , class ResHandler , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>> | |
| stream< typename stream_source_trait_t< Getter >::output > | make_source (const Handle &dest, Init init, Getter getter, ClosedPredicate pred, ResHandler res_handler, policy::arg< Scatterer > scatterer_type={}) |
Creates a new stream source and starts streaming to dest. More... | |
| template<class Init , class... Ts, class Getter , class ClosedPredicate , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>> | |
| annotated_stream< typename stream_source_trait_t< Getter >::output, Ts... > | make_source (std::tuple< Ts... > xs, Init init, Getter getter, ClosedPredicate pred, policy::arg< Scatterer > scatterer_type={}) |
| Creates a new stream source. More... | |
| template<class Init , class Getter , class ClosedPredicate , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>> | |
| stream< typename stream_source_trait_t< Getter >::output > | make_source (Init init, Getter getter, ClosedPredicate pred, policy::arg< Scatterer > scatterer_type={}) |
| Creates a new stream source. More... | |
| template<class In , class... Ts, class Init , class Fun , class Cleanup , class Gatherer = random_gatherer, class Scatterer = broadcast_scatterer<typename stream_stage_trait_t<Fun>::output>> | |
| annotated_stream< typename stream_stage_trait_t< Fun >::output, Ts... > | make_stage (const stream< In > &in, std::tuple< Ts... > xs, Init init, Fun fun, Cleanup cleanup, policy::arg< Gatherer, Scatterer > policies={}) |
| Creates a new stream stage. More... | |
| template<class In , class Init , class Fun , class Cleanup , class Gatherer = random_gatherer, class Scatterer = broadcast_scatterer<typename stream_stage_trait_t<Fun>::output>> | |
| stream< typename stream_stage_trait_t< Fun >::output > | make_stage (const stream< In > &in, Init init, Fun fun, Cleanup cleanup, policy::arg< Gatherer, Scatterer > policies={}) |
| Creates a new stream stage. More... | |
| template<class T , class In , class SuccessCallback , class... Ts> | |
| stream_result< typename T::output_type > | make_sink_impl (const stream< In > &in, SuccessCallback f, Ts &&... xs) |
| Creates a new stream sink of type T. More... | |
| template<class In , class Init , class Fun , class Finalize , class Gatherer = random_gatherer, class Scatterer = terminal_stream_scatterer> | |
| stream_result< typename stream_sink_trait_t< Fun, Finalize >::output > | make_sink (const stream< In > &in, Init init, Fun fun, Finalize finalize, policy::arg< Gatherer, Scatterer > policies={}) |
| Creates a new stream sink. More... | |
| streams_map & | streams () |
| void | trigger_downstreams () |
| Tries to send more data on all downstream paths. More... | |
| void | enqueue (strong_actor_ptr sender, message_id mid, message msg, execution_unit *host) override |
| virtual void | enqueue (mailbox_element_ptr what, execution_unit *host)=0 |
Enqueues a new message wrapped in a mailbox_element to the actor. More... | |
Protected Types | |
| using | doorman_map = std::unordered_map< accept_handle, intrusive_ptr< doorman > > |
| using | scribe_map = std::unordered_map< connection_handle, intrusive_ptr< scribe > > |
| using | datagram_servant_map = std::unordered_map< datagram_handle, intrusive_ptr< datagram_servant > > |
Protected Member Functions | |
| void | init_broker () |
| abstract_broker (actor_config &cfg) | |
| template<class Handle > | |
| auto | by_id (Handle hdl) -> optional< decltype(*ptr_of(hdl))> |
Returns a scribe or doorman identified by hdl. | |
Friends | |
| class | scribe |
| class | doorman |
| class | datagram_servant |
Additional Inherited Members | |
Public Types inherited from caf::scheduled_actor | |
| using | stream_manager_ptr = intrusive_ptr< stream_manager > |
A reference-counting pointer to a stream_manager. | |
| using | streams_map = std::unordered_map< stream_id, stream_manager_ptr > |
| A container for associating stream IDs to handlers. | |
| using | pending_response = std::pair< const message_id, behavior > |
| The message ID of an outstanding response with its callback. | |
| using | pointer = scheduled_actor * |
| A pointer to a scheduled actor. | |
| using | default_handler = std::function< result< message >(pointer, message_view &)> |
| Function object for handling unmatched messages. | |
| using | error_handler = std::function< void(pointer, error &)> |
| Function object for handling error messages. | |
| using | down_handler = std::function< void(pointer, down_msg &)> |
| Function object for handling down messages. | |
| using | exit_handler = std::function< void(pointer, exit_msg &)> |
| Function object for handling exit messages. | |
| using | exception_handler = std::function< error(pointer, std::exception_ptr &)> |
| Function object for handling exit messages. | |
Public Types inherited from caf::resumable | |
| enum | resume_result { resume_later, awaiting_message, done, shutdown_execution_unit } |
Denotes the state in which a resumable returned from its last call to resume. More... | |
| enum | subtype_t { unspecified, scheduled_actor, io_actor, function_object } |
Denotes common subtypes of resumable. More... | |
Static Public Member Functions inherited from caf::scheduled_actor | |
| static void | default_error_handler (pointer ptr, error &x) |
| static void | default_down_handler (pointer ptr, down_msg &x) |
| static void | default_exit_handler (pointer ptr, exit_msg &x) |
| static error | default_exception_handler (pointer ptr, std::exception_ptr &x) |
Related Functions inherited from caf::scheduled_actor | |
| result< message > | reflect (scheduled_actor *, message_view &) |
| result< message > | reflect_and_quit (scheduled_actor *, message_view &) |
| result< message > | print_and_drop (scheduled_actor *, message_view &) |
| result< message > | drop (scheduled_actor *, message_view &) |
A broker mediates between actor systems and other components in the network.
| datagram_handle caf::io::abstract_broker::add_datagram_servant_for_endpoint | ( | network::native_socket | fd, |
| const network::ip_endpoint & | ep | ||
| ) |
Creates and assigns a new datagram_servant from a given socket fd for the remote endpoint ep.
| expected<std::pair<accept_handle, uint16_t> > caf::io::abstract_broker::add_tcp_doorman | ( | uint16_t | port = 0, |
| const char * | in = nullptr, |
||
| bool | reuse_addr = false |
||
| ) |
Tries to open a local port and creates a doorman managing it on success.
If port == 0, then the broker will ask the operating system to pick a random port.
doorman and the assigned port. | expected<connection_handle> caf::io::abstract_broker::add_tcp_scribe | ( | const std::string & | host, |
| uint16_t | port | ||
| ) |
Tries to connect to host on given port and creates a new scribe describing the connection afterwards.
scribe on success. | expected<datagram_handle> caf::io::abstract_broker::add_udp_datagram_servant | ( | const std::string & | host, |
| uint16_t | port | ||
| ) |
Creates a new datagram_servant for the remote endpoint host and port.
datagram_servant. | expected<std::pair<datagram_handle, uint16_t> > caf::io::abstract_broker::add_udp_datagram_servant | ( | uint16_t | port = 0, |
| const char * | in = nullptr, |
||
| bool | reuse_addr = false |
||
| ) |
Tries to open a local port and creates a datagram_servant managing it on success.
If port == 0, then the broker will ask the operating system to pick a random port.
datagram_servant and the assigned port. | bool caf::io::abstract_broker::close | ( | Handle | hdl | ) |
Closes the connection or acceptor identified by handle.
Unwritten data will still be send.
| void caf::io::abstract_broker::configure_read | ( | connection_handle | hdl, |
| receive_policy::config | cfg | ||
| ) |
Modifies the receive policy for a given connection.
| hdl | Identifies the affected connection. |
| cfg | Contains the new receive policy. |
| std::string caf::io::abstract_broker::local_addr | ( | accept_handle | hdl | ) |
Returns the local address associated with hdl or empty string if hdl is invalid.
| uint16_t caf::io::abstract_broker::local_port | ( | datagram_handle | hdl | ) |
Returns the remote port associated with hdl or 0 if hdl is invalid.
| void caf::io::abstract_broker::move_datagram_servant | ( | datagram_servant_ptr | ptr | ) |
Moves an initialized datagram_servant instance ptr from another broker to this one.
| void caf::io::abstract_broker::move_scribe | ( | scribe_ptr | ptr | ) |
Moves the initialized scribe instance ptr from another broker to this broker.
| std::string caf::io::abstract_broker::remote_addr | ( | connection_handle | hdl | ) |
Returns the remote address associated with hdl or empty string if hdl is invalid.
| std::string caf::io::abstract_broker::remote_addr | ( | datagram_handle | hdl | ) |
Returns the remote address associated with hdl or an empty string if hdl is invalid.
| uint16_t caf::io::abstract_broker::remote_port | ( | connection_handle | hdl | ) |
Returns the remote port associated with hdl or 0 if hdl is invalid.
| uint16_t caf::io::abstract_broker::remote_port | ( | datagram_handle | hdl | ) |
Returns the remote port associated with hdl or 0 if hdl is invalid.
|
overridevirtual |
Resume any pending computation until it is either finished or needs to be re-scheduled later.
Reimplemented from caf::scheduled_actor.
|
overridevirtual |
Returns a subtype hint for this object.
This allows an execution unit to limit processing to a specific set of resumables and delegate other subtypes to dedicated workers.
Reimplemented from caf::scheduled_actor.
1.8.14