# File lib/nats/client.rb, line 611 def request(subject, data=nil, opts={}, &cb) return unless subject # In case of using async request then fallback to auto unsubscribe # based request/response and not break compatibility too much since # new request/response style can only be used with fibers. if cb inbox = "_INBOX.#{@nuid.next}" s = subscribe(inbox, opts) { |msg, reply| case cb.arity when 0 then cb.call when 1 then cb.call(msg) else cb.call(msg, reply) end } publish(subject, data, inbox) return s end # If this is the first request being made, then need to start # the responses mux handler that handles the responses. start_resp_mux_sub! unless @resp_sub_prefix # Generate unique token for the reply subject. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Synchronous request/response requires using a Fiber # to be able to await the response. f = Fiber.current @resp_map[token][:fiber] = f # If awaiting more than a single response then use array # to include all that could be gathered before the deadline. expected = opts[:max] ||= 1 @resp_map[token][:expected] = expected @resp_map[token][:msgs] = [] if expected > 1 # Announce the request with the inbox using the token. publish(subject, data, inbox) # If deadline expires, then discard the token and resume fiber opts[:timeout] ||= 0.5 t = EM.add_timer(opts[:timeout]) do if expected > 1 f.resume @resp_map[token][:msgs] else f.resume end @resp_map.delete(token) end # Wait for the response and cancel timeout callback if received. if expected > 1 # Wait to receive all replies that can get before deadline. msgs = Fiber.yield EM.cancel_timer(t) # Slice and throwaway responses that are not needed. return msgs.slice(0, expected) else msg = Fiber.yield EM.cancel_timer(t) return msg end end