|
libcaf
0.15.5
|
Manages a single stream with any number of down- and upstream actors. More...
#include <stream_manager.hpp>
Public Member Functions | |
| virtual error | open (const stream_id &sid, strong_actor_ptr hdl, strong_actor_ptr original_stage, stream_priority priority, bool redeployable, response_promise result_cb) |
Handles stream_msg::open messages. More... | |
| virtual error | ack_open (const stream_id &sid, const actor_addr &rebind_from, strong_actor_ptr rebind_to, long initial_demand, bool redeployable) |
Handles stream_msg::ack_open messages, i.e., finalizes the stream handshake. More... | |
| virtual error | batch (const stream_id &sid, const actor_addr &hdl, long xs_size, message &xs, int64_t xs_id) |
Handles stream_msg::batch messages. More... | |
| virtual error | ack_batch (const stream_id &sid, const actor_addr &hdl, long new_demand, int64_t cumulative_batch_id) |
Handles stream_msg::ack_batch messages. More... | |
| virtual error | close (const stream_id &sid, const actor_addr &hdl) |
Handles stream_msg::close messages. More... | |
| virtual error | drop (const stream_id &sid, const actor_addr &hdl) |
Handles stream_msg::drop messages. More... | |
| virtual error | forced_close (const stream_id &sid, const actor_addr &hdl, error reason) |
Handles stream_msg::drop messages. More... | |
| virtual error | forced_drop (const stream_id &sid, const actor_addr &hdl, error reason) |
Handles stream_msg::drop messages. More... | |
| virtual bool | add_sink (const stream_id &sid, strong_actor_ptr origin, strong_actor_ptr sink_ptr, mailbox_element::forwarding_stack stages, message_id handshake_mid, message handshake_data, stream_priority prio, bool redeployable) |
| Adds a new sink to the stream. | |
| virtual bool | add_source (const stream_id &sid, strong_actor_ptr source_ptr, strong_actor_ptr original_stage, stream_priority prio, bool redeployable, response_promise result_cb) |
Adds the source hdl to a stream. More... | |
| virtual void | push () |
| Pushes new data to downstream actors by sending batches. More... | |
| virtual void | abort (error reason) |
Aborts a stream after any stream message handler returned a non-default constructed error reason or the parent actor terminates with a non-default error. More... | |
| virtual void | close () |
| Closes the stream when the parent terminates with default exit reason or the stream reached its end. More... | |
| virtual stream_gatherer & | in ()=0 |
| Returns the stream edge for incoming data. | |
| virtual stream_scatterer & | out ()=0 |
| Returns the stream edge for outgoing data. | |
| virtual bool | done () const =0 |
| Returns whether the stream has reached the end and can be discarded safely. More... | |
| virtual bool | generate_messages () |
| Tries to generate new messages for the stream. More... | |
Public Member Functions inherited from caf::ref_counted | |
| ref_counted (const ref_counted &) | |
| ref_counted & | operator= (const ref_counted &) |
| void | ref () noexcept |
| Increases reference count by one. | |
| void | deref () noexcept |
Decreases reference count by one and calls request_deletion when it drops to zero. More... | |
| bool | unique () const noexcept |
| Queries whether there is exactly one reference. | |
| size_t | get_reference_count () const noexcept |
Public Member Functions inherited from caf::memory_managed | |
| virtual void | request_deletion (bool decremented_rc) noexcept |
| Default implementations calls `delete this, but can be overriden in case deletion depends on some condition or the class doesn't use default new/delete. More... | |
Protected Member Functions | |
| virtual message | make_final_result () |
| Called when the gatherer closes to produce the final stream result for all listeners. More... | |
| virtual error | process_batch (message &msg) |
| Called to handle incoming data. More... | |
| virtual void | input_closed (error reason) |
Called when in().closed() changes to true. More... | |
| virtual message | make_output_token (const stream_id &) const |
Returns a type-erased stream<T> as handshake token for downstream actors. More... | |
| virtual void | downstream_demand (outbound_path *ptr, long demand) |
| Called whenever new credit becomes available. More... | |
| virtual void | output_closed (error reason) |
Called when out().closed() changes to true. More... | |
Protected Attributes | |
| local_actor * | self_ |
| Pointer to the parent actor. | |
Protected Attributes inherited from caf::ref_counted | |
| std::atomic< size_t > | rc_ |
Related Functions | |
(Note that these are not member functions.) | |
| using | stream_manager_ptr = intrusive_ptr< stream_manager > |
A reference counting pointer to a stream_manager. | |
Related Functions inherited from caf::ref_counted | |
| void | intrusive_ptr_add_ref (ref_counted *p) |
| void | intrusive_ptr_release (ref_counted *p) |
Manages a single stream with any number of down- and upstream actors.
|
virtual |
Aborts a stream after any stream message handler returned a non-default constructed error reason or the parent actor terminates with a non-default error.
| reason | Previous error or non-default exit reason of the parent. |
|
virtual |
Handles stream_msg::ack_batch messages.
| hdl | Handle to the sender. |
| new_demand | New credit for sending data. |
| cumulative_batch_id | Id of last handled batch. |
hdl != nullptr
|
virtual |
Handles stream_msg::ack_open messages, i.e., finalizes the stream handshake.
| sid | ID of the outgoing stream. |
| rebind_from | Receiver of the original open message. |
| rebind_to | Sender of this confirmation. |
| initial_demand | Credit received with this ack_open. |
| redeployable | Denotes whether the runtime can redeploy rebind_to on failure. |
hdl != nullptr
|
virtual |
Adds the source hdl to a stream.
Convenience function for calling in().add_path(sid, hdl).second.
|
virtual |
Handles stream_msg::batch messages.
| hdl | Handle to the sender. |
| xs_size | Size of the vector stored in xs. |
| xs | A type-erased vector of size xs_size. |
| xs_id | ID of this batch (must be ACKed). |
hdl != nullptr xs_size > 0
|
virtual |
|
virtual |
Closes the stream when the parent terminates with default exit reason or the stream reached its end.
|
pure virtual |
Returns whether the stream has reached the end and can be discarded safely.
|
protectedvirtual |
Called whenever new credit becomes available.
The default implementation logs an error (sources are expected to override this hook).
|
virtual |
|
virtual |
Handles stream_msg::drop messages.
The default implementation calls abort(reason) and returns sec::unhandled_stream_error.
| hdl | Handle to the sender. |
| reason | Reported error from the source. |
hdl != nullptr err != none
|
virtual |
Handles stream_msg::drop messages.
The default implementation calls abort(reason) and returns sec::unhandled_stream_error.
| hdl | Handle to the sender. |
| reason | Reported error from the sink. |
hdl != nullptr err != none
|
virtual |
Tries to generate new messages for the stream.
This member function does nothing on stages and sinks, but can trigger a source to produce more messages.
|
protectedvirtual |
Called when in().closed() changes to true.
The default implementation does nothing.
|
protectedvirtual |
Called when the gatherer closes to produce the final stream result for all listeners.
The default implementation returns an empty message.
|
protectedvirtual |
Returns a type-erased stream<T> as handshake token for downstream actors.
Returns an empty message for sinks.
|
virtual |
Handles stream_msg::open messages.
| hdl | Handle to the sender. |
| original_stage | Handle to the initial receiver of the handshake. |
| priority | Affects credit assignment and maximum bandwidth. |
| redeployable | Configures whether hdl could get redeployed, i.e., can resume after an abort. |
| result_cb | Callback for the listener of the final stream result. Ignored when returning nullptr, because the previous stage is responsible for it until this manager acknowledges the handshake. |
hdl != nullptr
|
protectedvirtual |
Called when out().closed() changes to true.
The default implementation does nothing.
Called to handle incoming data.
The default implementation logs an error (sinks are expected to override this member function).
|
virtual |
Pushes new data to downstream actors by sending batches.
The amount of pushed data is limited by the available credit.
1.8.14