evergreen/osrf/
session.rs

1use crate::osrf::addr::BusAddress;
2use crate::osrf::client::{Client, ClientSingleton};
3use crate::osrf::conf;
4use crate::osrf::message;
5use crate::osrf::message::Message;
6use crate::osrf::message::MessageStatus;
7use crate::osrf::message::MessageType;
8use crate::osrf::message::MethodCall;
9use crate::osrf::message::Payload;
10use crate::osrf::message::Status;
11use crate::osrf::message::TransportMessage;
12use crate::osrf::params::ApiParams;
13use crate::util;
14use crate::{EgResult, EgValue};
15use std::cell::RefCell;
16use std::cell::RefMut;
17use std::collections::VecDeque;
18use std::fmt;
19use std::rc::Rc;
20
21const CONNECT_TIMEOUT: u64 = 10;
22pub const DEFAULT_REQUEST_TIMEOUT: u64 = 60;
23
24/// Response data propagated from a session to the calling Request.
25#[derive(Debug)]
26struct Response {
27    /// Response from an API call as a EgValue.
28    value: Option<EgValue>,
29    /// True if our originating Request is complete.
30    complete: bool,
31    /// True if this is a partial response
32    partial: bool,
33}
34
35/// Models a single API call through which the caller can receive responses.
36#[derive(Clone)]
37pub struct Request {
38    /// Link to our session so we can ask it for bus data.
39    session: Rc<RefCell<ClientSessionInternal>>,
40
41    /// Have we received all of the replies yet?
42    complete: bool,
43
44    /// Unique ID per thread/session.
45    thread_trace: usize,
46
47    /// Having a local copy of the thread can be handy since our
48    /// session is only accessible via temporary borrow().
49    thread: String,
50}
51
52impl Request {
53    fn new(
54        thread: String,
55        session: Rc<RefCell<ClientSessionInternal>>,
56        thread_trace: usize,
57    ) -> Request {
58        Request {
59            session,
60            thread,
61            complete: false,
62            thread_trace,
63        }
64    }
65
66    pub fn thread(&self) -> &str {
67        &self.thread
68    }
69
70    pub fn thread_trace(&self) -> usize {
71        self.thread_trace
72    }
73
74    /// True if we have received a COMPLETE message from the server.
75    ///
76    /// This does not guarantee all responses have been read.
77    pub fn complete(&self) -> bool {
78        self.complete
79    }
80
81    /// True if we have received a COMPLETE message from the server
82    /// and all responses from our network backlog have been read.
83    ///
84    /// It's possible to read the COMPLETE message before the caller
85    /// pulls all the data.
86    pub fn exhausted(&self) -> bool {
87        self.complete() && self.session.borrow().backlog.is_empty()
88    }
89
90    /// Pull all responses from the bus and return the first.
91    ///
92    /// Handy if you are expecting exactly one result, or only care
93    /// about the first, but want to pull all data off the bus until the
94    /// message is officially marked as complete.
95    pub fn first(&mut self) -> EgResult<Option<EgValue>> {
96        self.first_with_timeout(DEFAULT_REQUEST_TIMEOUT)
97    }
98
99    /// Returns the first response.
100    ///
101    /// This still waits for all responses to arrive so the request can
102    /// be marked as complete and no responses are left lingering on the
103    /// message bus.
104    pub fn first_with_timeout(&mut self, timeout: u64) -> EgResult<Option<EgValue>> {
105        let mut resp: Option<EgValue> = None;
106        while !self.complete {
107            if let Some(r) = self.recv_with_timeout(timeout)? {
108                if resp.is_none() {
109                    resp = Some(r);
110                } // else discard the non-first response.
111            }
112        }
113
114        Ok(resp)
115    }
116
117    /// Receive the next response to this Request
118    ///
119    /// timeout:
120    ///     <0 == wait indefinitely
121    ///      0 == do not wait/block
122    ///     >0 == wait up to this many seconds for a reply.
123    pub fn recv_with_timeout(&mut self, mut timeout: u64) -> EgResult<Option<EgValue>> {
124        if self.complete {
125            // If we are marked complete, we've pulled all of our
126            // resposnes from the bus.  However, we could still have
127            // data in the session backlog.
128            timeout = 0;
129        }
130
131        loop {
132            let response = self.session.borrow_mut().recv(self.thread_trace, timeout)?;
133
134            if let Some(r) = response {
135                if r.partial {
136                    // Keep calling receive until our partial message is
137                    // complete.  This effectively resets the receive
138                    // timeout on the assumption that once we start
139                    // receiving data we want to keep at it until we
140                    // receive all of it, regardless of the origianl
141                    // timeout value.
142                    continue;
143                }
144                if r.complete {
145                    self.complete = true;
146                }
147                return Ok(r.value);
148            } else {
149                return Ok(None);
150            }
151        }
152    }
153
154    pub fn recv(&mut self) -> EgResult<Option<EgValue>> {
155        self.recv_with_timeout(DEFAULT_REQUEST_TIMEOUT)
156    }
157}
158
159/// Client communication state maintenance.
160struct ClientSessionInternal {
161    /// Client so we can ask it to pull data from the Bus for us.
162    client: Client,
163
164    /// Each session is identified on the network by a random thread string.
165    thread: String,
166
167    /// Have we successfully established a connection withour
168    /// destination service?
169    connected: bool,
170
171    /// Service name.
172    service: String,
173
174    /// Top-level bus address for the service we're making requests of.
175    service_addr: BusAddress,
176
177    /// Routed messages go here.
178    router_addr: BusAddress,
179
180    /// Worker-specific bus address for our session.
181    ///
182    /// Set any time a response arrives so we know who sent it.
183    worker_addr: Option<BusAddress>,
184
185    /// Most recently used per-thread request id.
186    ///
187    /// Each new Request within a ClientSessionInternal gets a new thread_trace.
188    /// Replies have the same thread_trace as their request.
189    last_thread_trace: usize,
190
191    /// Replies to this thread which have not yet been pulled by
192    /// any requests.  Using VecDeque since it's optimized for
193    /// queue-like behavior (push back / pop front).
194    backlog: VecDeque<Message>,
195
196    /// Staging ground for "partial" messages arriving in chunks.
197    partial_buffer: Option<String>,
198}
199
200impl fmt::Display for ClientSessionInternal {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        write!(f, "Session({} {})", self.service(), self.thread())
203    }
204}
205
206impl ClientSessionInternal {
207    fn new(client: Client, service: &str) -> ClientSessionInternal {
208        let router_addr =
209            BusAddress::for_router(conf::config().client().router_name(), client.domain());
210
211        let service_addr = BusAddress::for_bare_service(service);
212
213        ClientSessionInternal {
214            client,
215            router_addr,
216            service_addr,
217            worker_addr: None,
218            service: String::from(service),
219            connected: false,
220            last_thread_trace: 0,
221            partial_buffer: None,
222            backlog: VecDeque::new(),
223            thread: util::random_number(16),
224        }
225    }
226
227    fn service(&self) -> &str {
228        &self.service
229    }
230
231    fn thread(&self) -> &str {
232        &self.thread
233    }
234
235    fn connected(&self) -> bool {
236        self.connected
237    }
238
239    fn reset(&mut self) {
240        //log::trace!("{self} resetting...");
241        self.worker_addr = None;
242        self.connected = false;
243        self.backlog.clear();
244    }
245
246    fn router_addr(&self) -> &BusAddress {
247        &self.router_addr
248    }
249
250    fn worker_addr(&self) -> Option<&BusAddress> {
251        self.worker_addr.as_ref()
252    }
253
254    fn service_addr(&self) -> &BusAddress {
255        &self.service_addr
256    }
257
258    /// Mutable Ref to our under-the-covers client singleton.
259    fn client_internal_mut(&self) -> RefMut<ClientSingleton> {
260        self.client.singleton().borrow_mut()
261    }
262
263    /// Returns the underlying address of the remote end if we have
264    /// a remote client address (i.e. we are connected).  Otherwise,
265    /// returns the underlying BusAddress for our service-level address.
266    fn destination_addr(&self) -> &BusAddress {
267        match self.worker_addr() {
268            Some(a) => a,
269            None => self.service_addr(),
270        }
271    }
272
273    fn recv_from_backlog(&mut self, thread_trace: usize) -> Option<Message> {
274        if let Some(index) = self
275            .backlog
276            .iter()
277            .position(|m| m.thread_trace() == thread_trace)
278        {
279            //log::trace!("{self} found a reply in the backlog for request {thread_trace}");
280
281            self.backlog.remove(index)
282        } else {
283            None
284        }
285    }
286
287    fn recv(&mut self, thread_trace: usize, timeout: u64) -> EgResult<Option<Response>> {
288        let mut timer = util::Timer::new(timeout);
289
290        let mut first_loop = true;
291        loop {
292            /*
293            log::trace!(
294                "{self} in recv() for trace {thread_trace} with {} remaining",
295                timer.remaining()
296            );
297            */
298
299            if let Some(msg) = self.recv_from_backlog(thread_trace) {
300                return self.unpack_reply(&mut timer, msg);
301            }
302
303            if first_loop {
304                first_loop = false;
305            } else if timer.done() {
306                // Avoid exiting on first loop so we have at least
307                // one chance to pull data from the network before exiting.
308                return Ok(None);
309            }
310
311            let mut tmsg = match self
312                .client_internal_mut()
313                .recv_session(&mut timer, self.thread())?
314            {
315                Some(m) => m,
316                None => continue, // timeout, etc.
317            };
318
319            // Look Who's Talking (Too?).
320            self.worker_addr = Some(BusAddress::parse_str(tmsg.from())?);
321
322            // Toss the messages onto our backlog as we receive them.
323            for msg in tmsg.body_mut().drain(..) {
324                self.backlog.push_back(msg);
325            }
326
327            // Loop back around and see if we can pull the message
328            // we want from our backlog.
329        }
330    }
331
332    /// Unpack one opensrf message -- there may be multiple opensrf
333    /// messages inside a single transport message.
334    fn unpack_reply(
335        &mut self,
336        timer: &mut util::Timer,
337        mut msg: Message,
338    ) -> EgResult<Option<Response>> {
339        if let Payload::Result(resp) = msg.payload_mut() {
340            log::trace!("{self} Unpacking osrf message status={}", resp.status());
341
342            // take_content() because this message is about to get dropped.
343            let mut value = resp.take_content();
344
345            if resp.status() == &MessageStatus::Partial {
346                let buf = match self.partial_buffer.as_mut() {
347                    Some(b) => b,
348                    None => {
349                        self.partial_buffer = Some(String::new());
350                        self.partial_buffer.as_mut().unwrap()
351                    }
352                };
353
354                // The content of a partial message is a raw JSON string,
355                // representing a subset of the JSON value response as a whole.
356                if let Some(chunk) = value.as_str() {
357                    buf.push_str(chunk);
358                }
359
360                return Ok(Some(Response {
361                    value: None,
362                    complete: false,
363                    partial: true,
364                }));
365            } else if resp.status() == &MessageStatus::PartialComplete {
366                // Take + clear the partial buffer.
367                let mut buf = self.partial_buffer.take().unwrap_or_default();
368
369                // Append any trailing content if available.
370                if let Some(chunk) = value.as_str() {
371                    buf.push_str(chunk);
372                }
373
374                // Compile the collected JSON chunks into a single value,
375                // which is the final response value.
376                let jval = json::parse(&buf)
377                    .map_err(|e| format!("Error reconstituting partial message: {e}"))?;
378
379                // Avoid exiting with an error on receipt of invalid data
380                // from the network.  See also Bus::recv().
381                value = match EgValue::from_json_value(jval) {
382                    Ok(v) => v,
383                    Err(e) => {
384                        log::error!("Error translating JSON value into EgValue: {e}");
385                        EgValue::Null
386                    }
387                };
388
389                log::trace!("Partial message is now complete");
390            }
391
392            return Ok(Some(Response {
393                value: Some(value),
394                complete: false,
395                partial: false,
396            }));
397        }
398
399        let trace = msg.thread_trace();
400
401        if let Payload::Status(stat) = msg.payload() {
402            self.unpack_status_message(trace, timer, stat)
403                .inspect_err(|_| self.reset())
404        } else {
405            self.reset();
406            Err(format!("{self} unexpected response for request {trace}: {msg:?}").into())
407        }
408    }
409
410    fn unpack_status_message(
411        &mut self,
412        trace: usize,
413        timer: &mut util::Timer,
414        statmsg: &Status,
415    ) -> EgResult<Option<Response>> {
416        let stat = statmsg.status();
417
418        match stat {
419            MessageStatus::Ok => {
420                self.connected = true;
421                Ok(None)
422            }
423            MessageStatus::Continue => {
424                timer.reset();
425                Ok(None)
426            }
427            MessageStatus::Complete => {
428                log::trace!("{self} request {trace} complete");
429                Ok(Some(Response {
430                    value: None,
431                    complete: true,
432                    partial: false,
433                }))
434            }
435            _ => {
436                self.reset();
437                Err(format!("{self} request {trace} failed: {}", statmsg).into())
438            }
439        }
440    }
441
442    fn incr_thread_trace(&mut self) -> usize {
443        self.last_thread_trace += 1;
444        self.last_thread_trace
445    }
446
447    /// Issue a new API call and return the thread_trace of the sent request.
448    fn request(&mut self, method: &str, params: impl Into<ApiParams>) -> EgResult<usize> {
449        log::debug!("{self} sending request {method}");
450
451        let trace = self.incr_thread_trace();
452
453        let mut params: ApiParams = params.into();
454        let params: Vec<EgValue> = params.take_params();
455
456        if !self.connected() {
457            // Discard any knowledge about previous communication
458            // with a specific worker since we are not connected.
459            self.worker_addr = None;
460        }
461
462        let tmsg = TransportMessage::with_body(
463            self.destination_addr().as_str(),
464            self.client.address().as_str(),
465            self.thread(),
466            Message::new(
467                MessageType::Request,
468                trace,
469                Payload::Method(MethodCall::new(method, params)),
470            ),
471        );
472
473        if !self.connected() {
474            // Top-level API calls always go through the router on
475            // our primary domain
476
477            let router_addr = self.router_addr().as_str();
478            self.client_internal_mut()
479                .bus_mut()
480                .send_to(tmsg, router_addr)?;
481        } else if let Some(a) = self.worker_addr() {
482            // Requests directly to client addresses must be routed
483            // to the domain of the client address.
484            self.client_internal_mut()
485                .get_domain_bus(a.domain())?
486                .send(tmsg)?;
487        } else {
488            self.reset();
489            return Err("We are connected, but have no worker_addr()".into());
490        }
491
492        Ok(trace)
493    }
494
495    /// Establish a connected session with a remote worker.
496    fn connect(&mut self) -> EgResult<()> {
497        if self.connected() {
498            log::warn!("{self} is already connected");
499            return Ok(());
500        }
501
502        // Discard any knowledge about previous communication
503        // with a specific worker since we are not connected.
504        self.worker_addr = None;
505
506        log::debug!("{self} sending CONNECT");
507
508        let trace = self.incr_thread_trace();
509
510        let tm = TransportMessage::with_body(
511            self.destination_addr().as_str(),
512            self.client.address().as_str(),
513            self.thread(),
514            Message::new(MessageType::Connect, trace, Payload::NoPayload),
515        );
516
517        // Connect calls always go to our router.
518        self.client
519            .singleton()
520            .borrow_mut()
521            .bus_mut()
522            .send_to(tm, self.router_addr().as_str())?;
523
524        self.recv(trace, CONNECT_TIMEOUT)?;
525
526        if self.connected() {
527            log::trace!("{self} connected OK");
528            Ok(())
529        } else {
530            self.reset();
531            Err("CONNECT timed out".into())
532        }
533    }
534
535    /// Send a DISCONNECT to our remote worker.
536    ///
537    /// Does not wait for any response.  NO-OP if not connected.
538    fn disconnect(&mut self) -> EgResult<()> {
539        if !self.connected() || self.worker_addr().is_none() {
540            self.reset();
541            return Ok(());
542        }
543
544        let trace = self.incr_thread_trace();
545
546        let dest_addr = self.worker_addr().unwrap(); // verified above
547
548        log::debug!("{self} sending DISCONNECT");
549
550        let tmsg = TransportMessage::with_body(
551            dest_addr.as_str(),
552            self.client.address().as_str(),
553            self.thread(),
554            Message::new(MessageType::Disconnect, trace, Payload::NoPayload),
555        );
556
557        self.client_internal_mut()
558            .get_domain_bus(dest_addr.domain())?
559            .send(tmsg)?;
560
561        self.reset();
562
563        Ok(())
564    }
565}
566
567/// Public-facing Session wrapper which exports the needed session API.
568pub struct ClientSession {
569    session: Rc<RefCell<ClientSessionInternal>>,
570}
571
572impl ClientSession {
573    pub fn new(client: Client, service: &str) -> ClientSession {
574        let ses = ClientSessionInternal::new(client, service);
575
576        log::trace!("Created new session {ses}");
577
578        ClientSession {
579            session: Rc::new(RefCell::new(ses)),
580        }
581    }
582
583    /// Issue a new API call and return the Request
584    ///
585    /// params is a JSON-able thing.  E.g. vec![1,2,3], json::object!{"a": "b"}, etc.
586    pub fn request(&mut self, method: &str, params: impl Into<ApiParams>) -> EgResult<Request> {
587        let thread = self.session.borrow().thread().to_string();
588
589        Ok(Request::new(
590            thread,
591            self.session.clone(),
592            self.session.borrow_mut().request(method, params)?,
593        ))
594    }
595
596    /// Send a request and receive a ResponseIterator for iterating
597    /// the responses to the method.
598    ///
599    /// Uses the default request timeout DEFAULT_REQUEST_TIMEOUT.
600    pub fn send_recv(
601        &mut self,
602        method: &str,
603        params: impl Into<ApiParams>,
604    ) -> EgResult<ResponseIterator> {
605        Ok(ResponseIterator::new(self.request(method, params)?))
606    }
607
608    pub fn connect(&self) -> EgResult<()> {
609        self.session.borrow_mut().connect()
610    }
611
612    pub fn disconnect(&self) -> EgResult<()> {
613        self.session.borrow_mut().disconnect()
614    }
615
616    pub fn connected(&self) -> bool {
617        self.session.borrow().connected()
618    }
619}
620
621/// Iterates over a series of replies to an API request.
622pub struct ResponseIterator {
623    request: Request,
624}
625
626impl Iterator for ResponseIterator {
627    type Item = EgResult<EgValue>;
628
629    fn next(&mut self) -> Option<Self::Item> {
630        self.request.recv().transpose()
631    }
632}
633
634impl ResponseIterator {
635    pub fn new(request: Request) -> Self {
636        ResponseIterator { request }
637    }
638}
639
640/// Minimal multi-session implementation.
641///
642/// Primary use is to blast a series of requests in parallel without
643/// having to be concerned about tracking them all or interacting
644/// with the underlying sessions.
645///
646/// Connecting sessions is not supported, because each session is
647/// responsible for exactly one request.
648///
649/// Maybe later:
650///     Max parallel / throttling
651pub struct MultiSession {
652    client: Client,
653    service: String,
654    requests: Vec<Request>,
655}
656
657impl MultiSession {
658    pub fn new(client: Client, service: &str) -> MultiSession {
659        MultiSession {
660            client,
661            service: service.to_string(),
662            requests: Vec::new(),
663        }
664    }
665
666    /// Create a new underlying session and send a request via the session.
667    ///
668    /// Returns the session thead so the caller can link specific
669    /// request to their responses (see recv()) if needed.
670    pub fn request(&mut self, method: &str, params: impl Into<ApiParams>) -> EgResult<String> {
671        let mut ses = self.client.session(&self.service);
672        let req = ses.request(method, params)?;
673        let thread = req.thread().to_string();
674
675        self.requests.push(req);
676
677        Ok(thread)
678    }
679
680    /// True if all requests have been marked complete and have
681    /// empty reply backlogs.
682    ///
683    /// May mark additional requests as complete as a side effect.
684    pub fn complete(&mut self) -> bool {
685        self.remove_completed();
686        self.requests.is_empty()
687    }
688
689    /// Wait up to `timeout` seconds for a response to arrive for any
690    /// of our outstanding requests.
691    ///
692    /// Returns (Thread, Response) if found
693    pub fn recv(&mut self, timeout: u64) -> EgResult<Option<(String, EgValue)>> {
694        // Wait for replies to any sessions on this client to appear
695        // then see if we can find one related specfically to the
696        // requests we are managing.
697
698        if self.client.wait(timeout)? {
699            for req in self.requests.iter_mut() {
700                if let Some(resp) = req.recv_with_timeout(0)? {
701                    return Ok(Some((req.thread.to_string(), resp)));
702                }
703            }
704        }
705
706        self.remove_completed();
707
708        Ok(None)
709    }
710
711    fn remove_completed(&mut self) {
712        // We consider a request to be complete only when it has
713        // received a COMPLETE messsage and its backlog has been
714        // drained.
715        let test = |r: &Request| r.exhausted();
716
717        while let Some(pos) = self.requests.iter().position(test) {
718            self.requests.remove(pos);
719        }
720    }
721}
722
723pub struct ServerSession {
724    /// Service name.
725    service: String,
726
727    /// Link to our ClientSingleton so we can ask it to pull data from the Bus.
728    client: Client,
729
730    /// Each session is identified on the network by a random thread string.
731    thread: String,
732
733    /// Who sent us a request.
734    sender: BusAddress,
735
736    /// True if we have already sent a COMPLETE message to the caller.
737    /// Use this to avoid sending replies after a COMPLETE.
738    responded_complete: bool,
739
740    /// Most recently used per-thread request id.
741    ///
742    /// Each new Request within a Session gets a new thread_trace.
743    /// Replies have the same thread_trace as their request.
744    last_thread_trace: usize,
745
746    /// Responses collected to be packed into an "atomic" response array.
747    atomic_resp_queue: Option<Vec<EgValue>>,
748}
749
750impl fmt::Display for ServerSession {
751    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
752        write!(f, "ServerSession({} {})", self.service(), self.thread())
753    }
754}
755
756impl ServerSession {
757    pub fn new(
758        client: Client,
759        service: &str,
760        thread: &str,
761        last_thread_trace: usize,
762        sender: BusAddress,
763    ) -> ServerSession {
764        ServerSession {
765            client,
766            sender,
767            last_thread_trace,
768            service: service.to_string(),
769            responded_complete: false,
770            thread: thread.to_string(),
771            atomic_resp_queue: None,
772        }
773    }
774
775    pub fn last_thread_trace(&self) -> usize {
776        self.last_thread_trace
777    }
778
779    pub fn set_last_thread_trace(&mut self, trace: usize) {
780        self.last_thread_trace = trace
781    }
782
783    pub fn clear_responded_complete(&mut self) {
784        self.responded_complete = false;
785    }
786
787    pub fn thread(&self) -> &str {
788        &self.thread
789    }
790
791    pub fn service(&self) -> &str {
792        &self.service
793    }
794
795    pub fn sender(&self) -> &BusAddress {
796        &self.sender
797    }
798
799    pub fn new_atomic_resp_queue(&mut self) {
800        log::debug!("{self} starting new atomic queue...");
801        self.atomic_resp_queue = Some(Vec::new());
802    }
803
804    /// Mutable Ref to our under-the-covers client singleton.
805    fn client_internal_mut(&self) -> RefMut<ClientSingleton> {
806        self.client.singleton().borrow_mut()
807    }
808
809    pub fn responded_complete(&self) -> bool {
810        self.responded_complete
811    }
812
813    /// Compiles a MessageType::Result Message with the provided
814    /// respone value, taking into account whether a response
815    /// should even be sent if this the result to an atomic request.
816    fn build_result_message(
817        &mut self,
818        mut result: Option<EgValue>,
819        complete: bool,
820    ) -> EgResult<Option<Message>> {
821        let result_value;
822
823        if self.atomic_resp_queue.is_some() {
824            // Add the reply to the queue.
825            let q = self.atomic_resp_queue.as_mut().unwrap();
826
827            if let Some(res) = result.take() {
828                q.push(res);
829            }
830
831            if complete {
832                // If we're completing the call and we have an atomic
833                // response queue, return the entire contents of the
834                // queue to the caller and leave the queue cleared
835                // [take() above].
836
837                result_value = self.atomic_resp_queue.take().unwrap().into();
838            } else {
839                // Nothing left to do since this atmoic request
840                // is still producing results.
841                return Ok(None);
842            }
843        } else {
844            // Non-atomic request.  Just return the value as is.
845            if let Some(res) = result.take() {
846                result_value = res;
847            } else {
848                return Ok(None);
849            }
850        }
851
852        Ok(Some(Message::new(
853            MessageType::Result,
854            self.last_thread_trace(),
855            Payload::Result(message::Result::new(
856                MessageStatus::Ok,
857                "OK",
858                "osrfResult",
859                result_value,
860            )),
861        )))
862    }
863
864    /// Respond with a value and/or a complete message.
865    fn respond_with_parts(&mut self, value: Option<EgValue>, complete: bool) -> EgResult<()> {
866        if self.responded_complete {
867            log::warn!(
868                r#"Dropping trailing replies after already sending a
869                Request Complete message for thread {}"#,
870                self.thread()
871            );
872            return Ok(());
873        }
874
875        let mut complete_msg = None;
876
877        let mut result_msg = self.build_result_message(value, complete)?;
878
879        if complete {
880            // Add a Request Complete message
881            self.responded_complete = true;
882
883            complete_msg = Some(Message::new(
884                MessageType::Status,
885                self.last_thread_trace(),
886                Payload::Status(message::Status::new(
887                    MessageStatus::Complete,
888                    "Request Complete",
889                    "osrfConnectStatus",
890                )),
891            ));
892        }
893
894        if result_msg.is_none() && complete_msg.is_none() {
895            // Nothing to send to the caller.
896            return Ok(());
897        }
898
899        // We have at least one message to return.
900        // Pack what we have into a single transport message.
901
902        let mut tmsg = TransportMessage::new(
903            self.sender.as_str(),
904            self.client.address().as_str(),
905            self.thread(),
906        );
907
908        if let Some(msg) = result_msg.take() {
909            tmsg.body_mut().push(msg);
910        }
911
912        if let Some(msg) = complete_msg.take() {
913            tmsg.body_mut().push(msg);
914        }
915
916        self.client_internal_mut()
917            .get_domain_bus(self.sender.domain())?
918            .send(tmsg)
919    }
920
921    pub fn send_complete(&mut self) -> EgResult<()> {
922        self.respond_with_parts(None, true)
923    }
924
925    pub fn respond(&mut self, value: impl Into<EgValue>) -> EgResult<()> {
926        self.respond_with_parts(Some(value.into()), false)
927    }
928
929    pub fn respond_complete(&mut self, value: impl Into<EgValue>) -> EgResult<()> {
930        self.respond_with_parts(Some(value.into()), true)
931    }
932}