evergreen/osrf/
client.rs

1use crate::osrf::addr::BusAddress;
2use crate::osrf::bus;
3use crate::osrf::conf;
4use crate::osrf::message;
5use crate::osrf::params::ApiParams;
6use crate::osrf::session::ClientSession;
7use crate::osrf::session::ResponseIterator;
8use crate::util;
9use crate::{EgResult, EgValue};
10use log::info;
11use std::cell::RefCell;
12use std::collections::HashMap;
13use std::fmt;
14use std::rc::Rc;
15
16/// Generally speaking, we only need 1 ClientSingleton per thread (hence
17/// the name).  This manages one bus connection per domain and stores
18/// messages pulled from the bus that have not yet been processed by
19/// higher-up modules.
20pub struct ClientSingleton {
21    /// Make it possible to clear our Bus so the caller may take it
22    /// back once they are done with this client.
23    bus: Option<bus::Bus>,
24
25    /// Our primary domain
26    domain: String,
27
28    /// Connections to remote domains
29    remote_bus_map: HashMap<String, bus::Bus>,
30
31    /// Queue of receieved transport messages that have yet to be
32    /// processed by any sessions.
33    backlog: Vec<message::TransportMessage>,
34}
35
36impl ClientSingleton {
37    fn new() -> EgResult<ClientSingleton> {
38        let bus = bus::Bus::new(conf::config().client())?;
39        Ok(ClientSingleton::from_bus(bus))
40    }
41
42    /// Create a new singleton instance from a previously setup Bus.
43    fn from_bus(bus: bus::Bus) -> ClientSingleton {
44        let domain = conf::config().client().domain().name();
45
46        ClientSingleton {
47            domain: domain.to_string(),
48            bus: Some(bus),
49            backlog: Vec::new(),
50            remote_bus_map: HashMap::new(),
51        }
52    }
53
54    /// Delete all messages that have been received but not yet pulled
55    /// for processing by any higher-up modules.
56    fn clear_backlog(&mut self) {
57        self.backlog.clear();
58    }
59
60    /// Our full bus address as a string
61    fn address(&self) -> &str {
62        self.bus().address().as_str()
63    }
64
65    /// Our primary bus domain
66    fn domain(&self) -> &str {
67        &self.domain
68    }
69
70    /// Ref to our Bus.
71    ///
72    /// Panics if bus is unset.
73    pub fn bus(&self) -> &bus::Bus {
74        match self.bus.as_ref() {
75            Some(b) => b,
76            None => panic!("Client has no Bus connection!"),
77        }
78    }
79
80    /// Mut ref to our Bus.
81    ///
82    /// Panics if our Bus is unset.
83    pub fn bus_mut(&mut self) -> &mut bus::Bus {
84        match self.bus.as_mut() {
85            Some(b) => b,
86            None => panic!("Client has no Bus connection!"),
87        }
88    }
89
90    /// Clear and return our Bus connection.
91    ///
92    /// Panics if our Bus is unset.
93    ///
94    /// Generally, take/set_bus are only used in unique scenarios.
95    /// Use with caution, since an unset Bus means the client cannot
96    /// be used and the thread will exit if the client is used.
97    pub fn take_bus(&mut self) -> bus::Bus {
98        match self.bus.take() {
99            Some(b) => b,
100            None => panic!("Client has to Bus connection!"),
101        }
102    }
103
104    /// Give this client a bus to use.
105    pub fn set_bus(&mut self, bus: bus::Bus) {
106        self.bus = Some(bus);
107    }
108
109    pub fn get_domain_bus(&mut self, domain: &str) -> EgResult<&mut bus::Bus> {
110        log::trace!("Loading bus connection for domain: {domain}");
111
112        if domain.eq(self.domain()) {
113            Ok(self.bus_mut())
114        } else {
115            if self.remote_bus_map.contains_key(domain) {
116                return Ok(self.remote_bus_map.get_mut(domain).unwrap());
117            }
118
119            self.add_connection(domain)
120        }
121    }
122
123    /// Add a connection to a new remote domain.
124    ///
125    /// Panics if our configuration has no primary domain.
126    fn add_connection(&mut self, domain: &str) -> EgResult<&mut bus::Bus> {
127        // When adding a connection to a remote domain, assume the same
128        // connection type, etc. is used and just change the domain.
129        let mut conf = conf::config().client().clone();
130
131        conf.set_domain(domain);
132
133        let bus = bus::Bus::new(&conf)?;
134
135        info!("Opened connection to new domain: {}", domain);
136
137        self.remote_bus_map.insert(domain.to_string(), bus);
138        self.get_domain_bus(domain)
139    }
140
141    /// Removes and returns the first transport message pulled from the
142    /// transport message backlog that matches the provided thread.
143    fn recv_session_from_backlog(&mut self, thread: &str) -> Option<message::TransportMessage> {
144        if let Some(index) = self.backlog.iter().position(|tm| tm.thread() == thread) {
145            Some(self.backlog.remove(index))
146        } else {
147            None
148        }
149    }
150
151    /// Returns true if any data exists in the backlog within the
152    /// timeout provided.
153    ///
154    /// This is useful for checking network activity
155    /// across multiple active sessions in lieu of polling each
156    /// session for responses.
157    pub fn wait(&mut self, timeout: u64) -> EgResult<bool> {
158        if !self.backlog.is_empty() {
159            return Ok(true);
160        }
161
162        let timer = util::Timer::new(timeout);
163
164        while self.backlog.is_empty() && !timer.done() {
165            if let Some(tm) = self.bus_mut().recv(timer.remaining(), None)? {
166                self.backlog.push(tm);
167                break;
168            }
169        }
170
171        Ok(!self.backlog.is_empty())
172    }
173
174    /// Receive up to one message destined for the specified session.
175    pub fn recv_session(
176        &mut self,
177        timer: &mut util::Timer,
178        thread: &str,
179    ) -> EgResult<Option<message::TransportMessage>> {
180        loop {
181            if let Some(tm) = self.recv_session_from_backlog(thread) {
182                return Ok(Some(tm));
183            }
184
185            if timer.done() {
186                // Nothing in the backlog and all out of time.
187                return Ok(None);
188            }
189
190            // See what we can pull from the message bus
191
192            if let Some(tm) = self.bus_mut().recv(timer.remaining(), None)? {
193                self.backlog.push(tm);
194            }
195
196            // Loop back around and see if we can pull a transport
197            // message from the backlog matching the requested thread.
198        }
199    }
200
201    /// Send a command to the router specified by username/domain, like
202    /// "register" and "unregister".
203    fn send_router_command(
204        &mut self,
205        username: &str,
206        domain: &str,
207        router_command: &str,
208        router_class: Option<&str>,
209    ) -> EgResult<()> {
210        let addr = BusAddress::for_router(username, domain);
211
212        // Always use the from address of our primary Bus
213        let mut tmsg = message::TransportMessage::new(
214            addr.as_str(),
215            self.bus().address().as_str(),
216            &util::random_number(16),
217        );
218
219        tmsg.set_router_command(router_command);
220        if let Some(rc) = router_class {
221            tmsg.set_router_class(rc);
222        }
223
224        let bus = self.get_domain_bus(domain)?;
225        bus.send(tmsg)?;
226
227        Ok(())
228    }
229}
230
231impl fmt::Display for ClientSingleton {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        write!(f, "ClientSingleton({})", self.address())
234    }
235}
236
237/// Wrapper around our ClientSingleton Ref so we can easily share a client
238/// within a given thread.
239///
240/// Wrapping the Ref in a struct allows us to present a client-like
241/// API to the caller.  I.e. the caller is not required to .borrow() /
242/// .borrow_mut() directly when performing actions against the client Ref.
243///
244/// When a new client Ref is needed, clone the Client.
245#[derive(Clone)]
246pub struct Client {
247    singleton: Rc<RefCell<ClientSingleton>>,
248    address: BusAddress,
249    domain: String,
250}
251
252impl Client {
253    /// Create a new Client and connect to the bus.
254    ///
255    /// NOTE: In most cases, cloning an existing client is the
256    /// preferred approach, since that guarantees you are
257    /// using an existing Bus connection, instead of creating
258    /// a new one, which is generally unnecessary.
259    pub fn connect() -> EgResult<Client> {
260        // This performs the actual bus-level connection.
261        let singleton = ClientSingleton::new()?;
262
263        let address = singleton.bus().address().clone();
264        let domain = singleton.domain().to_string();
265
266        Ok(Client {
267            address,
268            domain,
269            singleton: Rc::new(RefCell::new(singleton)),
270        })
271    }
272
273    /// Create a new Client from an existing Bus connection.
274    ///
275    /// This can be handy because a Bus is Send-able, but a Client is not.
276    pub fn from_bus(bus: bus::Bus) -> Client {
277        // This performs the actual bus-level connection.
278        let singleton = ClientSingleton::from_bus(bus);
279
280        let address = singleton.bus().address().clone();
281        let domain = singleton.domain().to_string();
282
283        Client {
284            address,
285            domain,
286            singleton: Rc::new(RefCell::new(singleton)),
287        }
288    }
289
290    /// Panics if bus is unset.
291    ///
292    /// Most callers will never need this.
293    pub fn take_bus(&self) -> bus::Bus {
294        self.singleton.borrow_mut().take_bus()
295    }
296
297    /// Apply a new bus.
298    ///
299    /// Most callers will never need this.
300    pub fn set_bus(&self, bus: bus::Bus) {
301        self.singleton.borrow_mut().set_bus(bus);
302    }
303
304    pub fn singleton(&self) -> &Rc<RefCell<ClientSingleton>> {
305        &self.singleton
306    }
307
308    pub fn address(&self) -> &BusAddress {
309        &self.address
310    }
311
312    pub fn domain(&self) -> &str {
313        &self.domain
314    }
315
316    /// Create a new client session for the requested service.
317    pub fn session(&self, service: &str) -> ClientSession {
318        ClientSession::new(self.clone(), service)
319    }
320
321    /// Discard any unprocessed messages from our backlog and clear our
322    /// stream of pending messages on the bus.
323    pub fn clear(&self) -> EgResult<()> {
324        self.singleton().borrow_mut().clear_backlog();
325        self.singleton().borrow_mut().bus_mut().clear_bus()
326    }
327
328    /// Wrapper for ClientSingleton::send_router_command()
329    pub fn send_router_command(
330        &self,
331        username: &str,
332        domain: &str,
333        command: &str,
334        router_class: Option<&str>,
335    ) -> EgResult<()> {
336        self.singleton()
337            .borrow_mut()
338            .send_router_command(username, domain, command, router_class)
339    }
340
341    /// Send a request and receive a ResponseIterator for iterating
342    /// the responses to the method.
343    ///
344    /// Uses the default request timeout DEFAULT_REQUEST_TIMEOUT.
345    pub fn send_recv_iter(
346        &self,
347        service: &str,
348        method: &str,
349        params: impl Into<ApiParams>,
350    ) -> EgResult<ResponseIterator> {
351        Ok(ResponseIterator::new(
352            self.session(service).request(method, params)?,
353        ))
354    }
355
356    /// Wrapper for ClientSingleton::wait()
357    pub fn wait(&self, timeout: u64) -> EgResult<bool> {
358        self.singleton().borrow_mut().wait(timeout)
359    }
360
361    /// Sends an API request and returns the first response, or None if
362    /// the API call times out.
363    ///
364    /// This still waits for all responses to arrive before returning the
365    /// first, so the request can be marked as complete and cleaned up.
366    pub fn send_recv_one(
367        &self,
368        service: &str,
369        method: &str,
370        params: impl Into<ApiParams>,
371    ) -> EgResult<Option<EgValue>> {
372        let mut ses = self.session(service);
373        let mut req = ses.request(method, params)?;
374
375        req.first()
376    }
377}